Author: jgomes
Date: Thu Nov 19 21:11:57 2009
New Revision: 882297

URL: http://svn.apache.org/viewvc?rev=882297&view=rev
Log:
Fixed timeout calculations in Dequeue().

Modified:
    
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs

Modified: 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=882297&r1=882296&r2=882297&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
 Thu Nov 19 21:11:57 2009
@@ -29,8 +29,8 @@
                DeliveredAck = 0, // Message delivered but not consumed
                PoisonAck = 1, // Message could not be processed due to poison 
pill but discard anyway
                ConsumedAck = 2, // Message consumed, discard
-        RedeliveredAck = 3, // Message has been Redelivered and is not yet 
poisoned.
-        IndividualAck = 4 // Only the given message is to be treated as 
consumed.
+               RedeliveredAck = 3, // Message has been Redelivered and is not 
yet poisoned.
+               IndividualAck = 4 // Only the given message is to be treated as 
consumed.
        }
 
        /// <summary>
@@ -38,40 +38,40 @@
        /// </summary>
        public class MessageConsumer : IMessageConsumer, IDispatcher
        {
-        private readonly MessageDispatchChannel unconsumedMessages = new 
MessageDispatchChannel();
-        private readonly LinkedList<MessageDispatch> dispatchedMessages = new 
LinkedList<MessageDispatch>();
-        private readonly ConsumerInfo info;
-        private Session session;
+               private readonly MessageDispatchChannel unconsumedMessages = 
new MessageDispatchChannel();
+               private readonly LinkedList<MessageDispatch> dispatchedMessages 
= new LinkedList<MessageDispatch>();
+               private readonly ConsumerInfo info;
+               private Session session;
 
-        private MessageAck pendingAck = null;
+               private MessageAck pendingAck = null;
 
-        private Atomic<bool> started = new Atomic<bool>();
-        private Atomic<bool> deliveringAcks = new Atomic<bool>();
+               private Atomic<bool> started = new Atomic<bool>();
+               private Atomic<bool> deliveringAcks = new Atomic<bool>();
 
                private int maximumRedeliveryCount = 10;
                private int redeliveryTimeout = 500;
                protected bool disposed = false;
-        private long lastDeliveredSequenceId = 0;
-        private int deliveredCounter = 0;
-        private int additionalWindowSize = 0;
-        private long redeliveryDelay = 0;
-        private int dispatchedCount = 0;
-        private volatile bool synchronizationRegistered = false;
-        private bool clearDispatchList = false;
+               private long lastDeliveredSequenceId = 0;
+               private int deliveredCounter = 0;
+               private int additionalWindowSize = 0;
+               private long redeliveryDelay = 0;
+               private int dispatchedCount = 0;
+               private volatile bool synchronizationRegistered = false;
+               private bool clearDispatchList = false;
 
-        private const int DEFAULT_REDELIVERY_DELAY = 0;
-        private const int DEFAULT_MAX_REDELIVERIES = 5;
+               private const int DEFAULT_REDELIVERY_DELAY = 0;
+               private const int DEFAULT_MAX_REDELIVERIES = 5;
 
-        private event MessageListener listener;
+               private event MessageListener listener;
+
+               private IRedeliveryPolicy redeliveryPolicy;
 
-        private IRedeliveryPolicy redeliveryPolicy;
-        
                // Constructor internal to prevent clients from creating an 
instance.
                internal MessageConsumer(Session session, ConsumerInfo info)
                {
                        this.session = session;
                        this.info = info;
-            this.redeliveryPolicy = this.session.Connection.RedeliveryPolicy;
+                       this.redeliveryPolicy = 
this.session.Connection.RedeliveryPolicy;
                }
 
                ~MessageConsumer()
@@ -79,13 +79,13 @@
                        Dispose(false);
                }
 
-        #region Property Accessors
+               #region Property Accessors
+
+               public long LastDeliveredSequenceId
+               {
+                       get { return this.lastDeliveredSequenceId; }
+               }
 
-        public long LastDeliveredSequenceId
-        {
-            get{ return this.lastDeliveredSequenceId; }
-        }
-        
                public ConsumerId ConsumerId
                {
                        get { return info.ConsumerId; }
@@ -103,18 +103,18 @@
                        set { redeliveryTimeout = value; }
                }
 
-        public int PrefetchSize
-        {
-            get { return this.info.PrefetchSize; }
-        }
-
-        public IRedeliveryPolicy RedeliveryPolicy
-        {
-            get { return this.redeliveryPolicy; }
-            set { this.redeliveryPolicy = value; }
-        }
+               public int PrefetchSize
+               {
+                       get { return this.info.PrefetchSize; }
+               }
+
+               public IRedeliveryPolicy RedeliveryPolicy
+               {
+                       get { return this.redeliveryPolicy; }
+                       set { this.redeliveryPolicy = value; }
+               }
 
-        #endregion
+               #endregion
 
                #region IMessageConsumer Members
 
@@ -122,104 +122,104 @@
                {
                        add
                        {
-                CheckClosed();
+                               CheckClosed();
+
+                               if(this.PrefetchSize == 0)
+                               {
+                                       throw new NMSException("Cannot set 
Asynchronous Listener on a Consumer with a zero Prefetch size");
+                               }
+
+                               bool wasStarted = this.session.Started;
+
+                               if(wasStarted == true)
+                               {
+                                       this.session.Stop();
+                               }
 
-                if(this.PrefetchSize == 0)
-                {
-                    throw new NMSException("Cannot set Asynchronous Listener 
on a Consumer with a zero Prefetch size");
-                }
-
-                bool wasStarted = this.session.Started;
-
-                if(wasStarted == true)
-                {
-                    this.session.Stop();
-                }
-                
                                listener += value;
-                this.session.Redispatch(this.unconsumedMessages);
+                               
this.session.Redispatch(this.unconsumedMessages);
 
-                if(wasStarted == true)
-                {
-                    this.session.Start();
-                }
+                               if(wasStarted == true)
+                               {
+                                       this.session.Start();
+                               }
                        }
                        remove { listener -= value; }
                }
 
                public IMessage Receive()
                {
-            CheckClosed();
-            CheckMessageListener();
-    
-            SendPullRequest(0);
-            MessageDispatch dispatch = 
this.Dequeue(TimeSpan.FromMilliseconds(-1));
-            
-            if(dispatch == null)
-            {
-                return null;
-            }
-    
-            BeforeMessageIsConsumed(dispatch);
-            AfterMessageIsConsumed(dispatch, false);
-    
-            return CreateActiveMQMessage(dispatch);
+                       CheckClosed();
+                       CheckMessageListener();
+
+                       SendPullRequest(0);
+                       MessageDispatch dispatch = 
this.Dequeue(TimeSpan.FromMilliseconds(-1));
+
+                       if(dispatch == null)
+                       {
+                               return null;
+                       }
+
+                       BeforeMessageIsConsumed(dispatch);
+                       AfterMessageIsConsumed(dispatch, false);
+
+                       return CreateActiveMQMessage(dispatch);
                }
 
                public IMessage Receive(TimeSpan timeout)
                {
-            CheckClosed();
-            CheckMessageListener();
-    
-            MessageDispatch dispatch = null;
-            SendPullRequest((long)timeout.TotalMilliseconds);
-            
-            if(this.PrefetchSize == 0)
-            {
-                dispatch = this.Dequeue(TimeSpan.FromMilliseconds(-1));
-            }
-            else
-            {
-                dispatch = this.Dequeue(timeout);
-            }
-            
-            if(dispatch == null)
-            {
-                return null;
-            }
-    
-            BeforeMessageIsConsumed(dispatch);
-            AfterMessageIsConsumed(dispatch, false);
-            
-            return CreateActiveMQMessage(dispatch);
+                       CheckClosed();
+                       CheckMessageListener();
+
+                       MessageDispatch dispatch = null;
+                       SendPullRequest((long) timeout.TotalMilliseconds);
+
+                       if(this.PrefetchSize == 0)
+                       {
+                               dispatch = 
this.Dequeue(TimeSpan.FromMilliseconds(-1));
+                       }
+                       else
+                       {
+                               dispatch = this.Dequeue(timeout);
+                       }
+
+                       if(dispatch == null)
+                       {
+                               return null;
+                       }
+
+                       BeforeMessageIsConsumed(dispatch);
+                       AfterMessageIsConsumed(dispatch, false);
+
+                       return CreateActiveMQMessage(dispatch);
                }
 
                public IMessage ReceiveNoWait()
                {
-            CheckClosed();
-            CheckMessageListener();
+                       CheckClosed();
+                       CheckMessageListener();
 
-            MessageDispatch dispatch = null;
-            SendPullRequest(-1);
-            
-            if(this.PrefetchSize == 0)
-            {
-                dispatch = this.Dequeue(TimeSpan.FromMilliseconds(-1));
-            }
-            else
-            {
-                dispatch = this.Dequeue(TimeSpan.Zero);
-            }
-            
-            if(dispatch == null)
-            {
-                return null;
-            }
-    
-            BeforeMessageIsConsumed(dispatch);
-            AfterMessageIsConsumed(dispatch, false);
-    
-            return CreateActiveMQMessage(dispatch);
+                       MessageDispatch dispatch = null;
+                       SendPullRequest(-1);
+
+                       if(this.PrefetchSize == 0)
+                       {
+                               dispatch = 
this.Dequeue(TimeSpan.FromMilliseconds(-1));
+                       }
+                       else
+                       {
+                               dispatch = this.Dequeue(TimeSpan.Zero);
+                       }
+
+                       if(dispatch == null)
+                       {
+                               return null;
+                       }
+
+                       BeforeMessageIsConsumed(dispatch);
+                       AfterMessageIsConsumed(dispatch, false);
+
+                       return CreateActiveMQMessage(dispatch);
                }
 
                public void Dispose()
@@ -242,8 +242,8 @@
 
                        try
                        {
-                Close();
-            }
+                               Close();
+                       }
                        catch
                        {
                                // Ignore network errors.
@@ -254,53 +254,53 @@
 
                public void Close()
                {
-            if(!this.unconsumedMessages.Closed)
-            {
-                if(this.session.IsTransacted && 
this.session.TransactionContext.InTransaction) 
-                {
-                    this.session.TransactionContext.AddSynchronization(new 
ConsumerCloseSynchronization(this));
-                } 
-                else
-                {
-                    this.DoClose();
-                } 
-            }
-               }
-
-        internal void DoClose()
-        {
-            if(!this.unconsumedMessages.Closed)
-            {                    
-                // Do we have any acks we need to send out before closing?
-                // Ack any delivered messages now.
-                if(!this.session.IsTransacted) 
-                {
-                    DeliverAcks();
-                    if(this.IsAutoAcknowledgeBatch)
-                    {
-                        Acknowledge();
-                    }
-                }
-                
-                if(!this.session.IsTransacted)
-                {
-                    lock(this.dispatchedMessages)
-                    {
-                        dispatchedMessages.Clear();
-                    }
-                }
-                
-                this.unconsumedMessages.Close();
-                this.session.DisposeOf(this.info.ConsumerId, 
this.lastDeliveredSequenceId);
-
-                RemoveInfo removeCommand = new RemoveInfo();
-                removeCommand.ObjectId = this.info.ConsumerId;
-                removeCommand.LastDeliveredSequenceId = 
this.lastDeliveredSequenceId;
-                
-                this.session.Connection.Oneway(removeCommand);
-                this.session = null;
-            }
-        }
+                       if(!this.unconsumedMessages.Closed)
+                       {
+                               if(this.session.IsTransacted && 
this.session.TransactionContext.InTransaction)
+                               {
+                                       
this.session.TransactionContext.AddSynchronization(new 
ConsumerCloseSynchronization(this));
+                               }
+                               else
+                               {
+                                       this.DoClose();
+                               }
+                       }
+               }
+
+               internal void DoClose()
+               {
+                       if(!this.unconsumedMessages.Closed)
+                       {
+                               // Do we have any acks we need to send out 
before closing?
+                               // Ack any delivered messages now.
+                               if(!this.session.IsTransacted)
+                               {
+                                       DeliverAcks();
+                                       if(this.IsAutoAcknowledgeBatch)
+                                       {
+                                               Acknowledge();
+                                       }
+                               }
+
+                               if(!this.session.IsTransacted)
+                               {
+                                       lock(this.dispatchedMessages)
+                                       {
+                                               dispatchedMessages.Clear();
+                                       }
+                               }
+
+                               this.unconsumedMessages.Close();
+                               this.session.DisposeOf(this.info.ConsumerId, 
this.lastDeliveredSequenceId);
+
+                               RemoveInfo removeCommand = new RemoveInfo();
+                               removeCommand.ObjectId = this.info.ConsumerId;
+                               removeCommand.LastDeliveredSequenceId = 
this.lastDeliveredSequenceId;
+
+                               this.session.Connection.Oneway(removeCommand);
+                               this.session = null;
+                       }
+               }
 
                #endregion
 
@@ -315,716 +315,739 @@
                                messagePull.ResponseRequired = false;
 
                                Tracer.Debug("Sending MessagePull: " + 
messagePull);
-                           session.Connection.Oneway(messagePull);
+                               session.Connection.Oneway(messagePull);
+                       }
+               }
+
+               protected void DoIndividualAcknowledge(ActiveMQMessage message)
+               {
+                       MessageDispatch dispatch = null;
+
+                       lock(this.dispatchedMessages)
+                       {
+                               foreach(MessageDispatch originalDispatch in 
this.dispatchedMessages)
+                               {
+                                       
if(originalDispatch.Message.MessageId.Equals(message.MessageId))
+                                       {
+                                               dispatch = originalDispatch;
+                                               
this.dispatchedMessages.Remove(originalDispatch);
+                                               break;
+                                       }
+
+                                       return;
+                               }
                        }
+
+                       MessageAck ack = new MessageAck();
+
+                       ack.AckType = (byte) AckType.IndividualAck;
+                       ack.ConsumerId = this.info.ConsumerId;
+                       ack.Destination = dispatch.Destination;
+                       ack.LastMessageId = dispatch.Message.MessageId;
+                       ack.MessageCount = 1;
+
+                       this.session.Connection.Oneway(ack);
                }
 
-        protected void DoIndividualAcknowledge(ActiveMQMessage message)
-        {
-            MessageDispatch dispatch = null;
-            
-            lock(this.dispatchedMessages)
-            {
-                foreach(MessageDispatch originalDispatch in 
this.dispatchedMessages)
-                {
-                    
if(originalDispatch.Message.MessageId.Equals(message.MessageId))
-                    {
-                        dispatch = originalDispatch;
-                        this.dispatchedMessages.Remove(originalDispatch);
-                        break;
-                    }
-
-                    return;
-                }
-            }
-
-            MessageAck ack = new MessageAck();
-
-            ack.AckType = (byte)AckType.IndividualAck;
-            ack.ConsumerId = this.info.ConsumerId;
-            ack.Destination = dispatch.Destination;
-            ack.LastMessageId = dispatch.Message.MessageId;
-            ack.MessageCount = 1;            
-
-            this.session.Connection.Oneway(ack);            
-        }
-                
                protected void DoNothingAcknowledge(ActiveMQMessage message)
                {
                }
 
                protected void DoClientAcknowledge(ActiveMQMessage message)
                {
-            this.CheckClosed();
+                       this.CheckClosed();
                        Tracer.Debug("Sending Client Ack:");
-            this.session.Acknowledge();
+                       this.session.Acknowledge();
+               }
+
+               public void Start()
+               {
+                       if(this.unconsumedMessages.Closed)
+                       {
+                               return;
+                       }
+
+                       this.started.Value = true;
+                       this.unconsumedMessages.Start();
+                       this.session.Executor.Wakeup();
                }
 
-        public void Start()
-        {
-            if(this.unconsumedMessages.Closed)
-            {
-                return;
-            }
-
-            this.started.Value = true;
-            this.unconsumedMessages.Start();
-            this.session.Executor.Wakeup();
-        }
-    
-        public void Stop()
-        {
-            this.started.Value = false;
-            this.unconsumedMessages.Stop();
-        }
-
-        public void ClearMessagesInProgress()
-        {
-            // we are called from inside the transport reconnection logic
-            // which involves us clearing all the connections' consumers
-            // dispatch lists and clearing them
-            // so rather than trying to grab a mutex (which could be already
-            // owned by the message listener calling the send) we will just set
-            // a flag so that the list can be cleared as soon as the
-            // dispatch thread is ready to flush the dispatch list
-            this.clearDispatchList = true;
-        }
-
-        public void DeliverAcks()
-        {
-            MessageAck ack = null;
-            
-            if(this.deliveringAcks.CompareAndSet(false, true))
-            {
-                if(this.IsAutoAcknowledgeEach)
-                {
-                    lock(this.dispatchedMessages)
-                    {
-                        ack = 
MakeAckForAllDeliveredMessages(AckType.DeliveredAck);
-                        if(ack != null) 
-                        {
-                            this.dispatchedMessages.Clear();
-                        } 
-                        else
-                        {
-                            ack = this.pendingAck;
-                            this.pendingAck = null;
-                        }
-                    }
-                } 
-                else if(pendingAck != null && pendingAck.AckType == 
(byte)AckType.ConsumedAck) 
-                {
-                    ack = pendingAck;
-                    pendingAck = null;
-                }
-                
-                if(ack != null)
-                {
-                    MessageAck ackToSend = ack;
-                    
-                    try
-                    {
-                        this.session.Connection.Oneway(ackToSend);
-                    }
-                    catch(Exception e)
-                    {
-                        Tracer.DebugFormat("{0} : Failed to send ack, {1}", 
this.info.ConsumerId, e);
-                    }
-                } 
-                else
-                {
-                    this.deliveringAcks.Value = false;
-                }
-            }
-        }
-
-        public void Dispatch(MessageDispatch dispatch)
-        {
-            MessageListener listener = this.listener;
-            
-            try 
-            {
-                lock(this.unconsumedMessages.SyncRoot) 
-                {
-                    if(this.clearDispatchList)
-                    {
-                        // we are reconnecting so lets flush the in progress 
messages
-                        this.clearDispatchList = false;
-                        this.unconsumedMessages.Clear();
-                        
-                        if(this.pendingAck != null && this.pendingAck.AckType 
== (byte)AckType.DeliveredAck) 
-                        {
-                            // on resumption a pending delivered ack will be 
out of sync with
-                            // re-deliveries.
-                            Tracer.Debug("removing pending delivered ack on 
transport interupt: " + pendingAck);
-                            this.pendingAck = null;
-                        }
-                    }
-                    
-                    if(!this.unconsumedMessages.Closed) 
-                    {
-                        if(listener != null && this.unconsumedMessages.Running)
-                        {
-                            ActiveMQMessage message = 
CreateActiveMQMessage(dispatch);
-                            
-                            this.BeforeMessageIsConsumed(dispatch);
-                            
-                            try
-                            {
-                                bool expired = message.IsExpired();
-                                
-                                if(!expired)
-                                {
-                                    listener(message);
-                                }
-                                
-                                this.AfterMessageIsConsumed(dispatch, expired);
-                            } 
-                            catch(Exception e) 
-                            {
-                                if(IsAutoAcknowledgeBatch || 
IsAutoAcknowledgeEach || this.session.IsIndividualAcknowledge)
-                                {
-                                    // Redeliver the message
-                                } 
-                                else
-                                {
-                                    // Transacted or Client ack: Deliver the 
next message.
-                                    this.AfterMessageIsConsumed(dispatch, 
false);
-                                }
-
-                                Tracer.Error(this.info.ConsumerId + " 
Exception while processing message: " + e);
-                            }
-                        } 
-                        else 
-                        {
-                            this.unconsumedMessages.Enqueue(dispatch);
-                        }
-                    }
-                }
-                
-                if(++dispatchedCount % 1000 == 0) 
-                {
-                    dispatchedCount = 0;
-                    Thread.Sleep(1);
-                }
-            } 
-            catch(Exception e) 
-            {
-                this.session.Connection.OnSessionException(this.session, e);
-            }
-        }
-
-        public bool Iterate()
-        {
-            if(this.listener != null) 
-            {
-                MessageDispatch dispatch = 
this.unconsumedMessages.DequeueNoWait();
-                if(dispatch != null) 
-                {
-                    try 
-                    {
-                        ActiveMQMessage message = 
CreateActiveMQMessage(dispatch);
-                        BeforeMessageIsConsumed(dispatch);
-                        listener(message);
-                        AfterMessageIsConsumed(dispatch, false);
-                    } 
-                    catch(NMSException e) 
-                    {
-                        
this.session.Connection.OnSessionException(this.session, e);
-                    }
-                    
-                    return true;
-                }
-            }
-            
-            return false;
-        }
-
-        /// <summary>
-        /// Used to get an enqueued message from the unconsumedMessages list. 
The
-        /// amount of time this method blocks is based on the timeout value.  
if
-        /// timeout == Timeout.Infinite then it blocks until a message is 
received. 
-        /// if timeout == 0 then it it tries to not block at all, it returns a 
-        /// message if it is available if timeout > 0 then it blocks up to 
timeout 
-        /// amount of time.  Expired messages will consumed by this method.
-        /// </summary>
-        /// <param name="timeout">
-        /// A <see cref="System.Int64"/>
-        /// </param>
-        /// <returns>
-        /// A <see cref="MessageDispatch"/>
-        /// </returns>
-        private MessageDispatch Dequeue(TimeSpan timeout) 
-        {
-            DateTime deadline = DateTime.Now;
-            
-            if(timeout > TimeSpan.Zero)
-            {
-                deadline = DateTime.Now + timeout;
-            }
-            
-            while(true) 
-            {
-                MessageDispatch dispatch = 
this.unconsumedMessages.Dequeue(timeout);
-                
-                if(dispatch == null)
-                {
-                    if(timeout > TimeSpan.Zero && 
!this.unconsumedMessages.Closed) 
-                    {
-                        timeout = deadline < DateTime.Now ? TimeSpan.Zero : 
deadline - DateTime.Now;
-                    }
-                    else
-                    {
-                        return null;
-                    }
-                } 
-                else if(dispatch.Message == null) 
-                {
-                    return null;
-                } 
-                else if(dispatch.Message.IsExpired())
-                {
-                    Tracer.DebugFormat("{0} received expired message: {1}", 
info.ConsumerId, dispatch.Message.MessageId);
-                    
-                    BeforeMessageIsConsumed(dispatch);
-                    AfterMessageIsConsumed(dispatch, true);
-                    
-                    if(timeout > TimeSpan.Zero && 
!this.unconsumedMessages.Closed) 
-                    {
-                        timeout = deadline < DateTime.Now ? TimeSpan.Zero : 
deadline - DateTime.Now;
-                    }
-                } 
-                else 
-                {
-                    return dispatch;
-                }
-            }
-        }
-
-        public void BeforeMessageIsConsumed(MessageDispatch dispatch)
-        {
-            this.lastDeliveredSequenceId = 
dispatch.Message.MessageId.BrokerSequenceId;
-            
-            if(!IsAutoAcknowledgeBatch)
-            {
-                lock(this.dispatchedMessages) 
-                {
-                    this.dispatchedMessages.AddFirst(dispatch);
-                }
-                
-                if(this.session.IsTransacted) 
-                {
-                    this.AckLater(dispatch, AckType.DeliveredAck);
-                }
-            }            
-        }
-        
-        public void AfterMessageIsConsumed(MessageDispatch dispatch, bool 
expired)
-        {
-            if(this.unconsumedMessages.Closed) 
-            {
-                return;
-            }
-            
-            if(expired == true) 
-            {
-                lock(this.dispatchedMessages)
-                {
-                    this.dispatchedMessages.Remove(dispatch);
-                }
-                
-                AckLater(dispatch, AckType.DeliveredAck);
-            }
-            else
-            {
-                if(this.session.IsTransacted)
-                {
-                    // Do nothing.
-                }
-                else if(this.IsAutoAcknowledgeEach)
-                {
-                    if(this.deliveringAcks.CompareAndSet(false, true)) 
-                    {
-                        lock(this.dispatchedMessages) 
-                        {
-                            if(this.dispatchedMessages.Count != 0)
-                            {
-                                MessageAck ack = 
MakeAckForAllDeliveredMessages(AckType.ConsumedAck);
-                                if(ack !=null) 
-                                {
-                                    this.dispatchedMessages.Clear();
-                                    this.session.Connection.Oneway(ack);
-                                }
-                            }
-                        }
-                        this.deliveringAcks.Value = false;
-                    }
-                } 
-                else if(this.IsAutoAcknowledgeBatch)
-                {
-                    AckLater(dispatch, AckType.ConsumedAck);
-                }
-                else if(this.session.IsClientAcknowledge || 
this.session.IsIndividualAcknowledge) 
-                {
-                    AckLater(dispatch, AckType.DeliveredAck);
-                } 
-                else 
-                {
-                    throw new NMSException("Invalid session state.");
-                }
-            }            
-        }
-
-        private MessageAck MakeAckForAllDeliveredMessages(AckType type)
-        {
-            lock(this.dispatchedMessages)
-            {
-                if(this.dispatchedMessages.Count == 0)
-                {
-                    return null;
-                }
-                    
-                MessageDispatch dispatch = this.dispatchedMessages.First.Value;
-                MessageAck ack = new MessageAck();
-                
-                ack.AckType = (byte)type;
-                ack.ConsumerId = this.info.ConsumerId;
-                ack.Destination = dispatch.Destination;
-                ack.LastMessageId = dispatch.Message.MessageId;
-                ack.MessageCount = this.dispatchedMessages.Count;
-                ack.FirstMessageId = 
this.dispatchedMessages.Last.Value.Message.MessageId;
-                
-                return ack;
-            }
-        }
-
-        private void AckLater(MessageDispatch dispatch, AckType type)
-        {
-            // Don't acknowledge now, but we may need to let the broker know 
the
-            // consumer got the message to expand the pre-fetch window
-            if(this.session.IsTransacted)
-            {
-                this.session.DoStartTransaction();
-                
-                if(!synchronizationRegistered) 
-                {
-                    this.synchronizationRegistered = true;
-                    this.session.TransactionContext.AddSynchronization(new 
MessageConsumerSynchronization(this));
-                }
-            }
-
-            this.deliveredCounter++;
-            
-            MessageAck oldPendingAck = pendingAck;
-            
-            pendingAck = new MessageAck();
-            pendingAck.AckType = (byte)type;
-            pendingAck.ConsumerId = this.info.ConsumerId;
-            pendingAck.Destination = dispatch.Destination;
-            pendingAck.LastMessageId = dispatch.Message.MessageId;
-            pendingAck.MessageCount = deliveredCounter;
-
-            if(this.session.IsTransacted && 
this.session.TransactionContext.InTransaction)
-            {
-                pendingAck.TransactionId = 
this.session.TransactionContext.TransactionId;
-            }
-            
-            if(oldPendingAck == null) 
-            {
-                pendingAck.FirstMessageId = pendingAck.LastMessageId;
-            } 
-            else if(oldPendingAck.AckType == pendingAck.AckType)
-            {
-                pendingAck.FirstMessageId = oldPendingAck.FirstMessageId;
-            } 
-            else 
-            {
-                // old pending ack being superseded by ack of another type, if 
is is not a delivered
-                // ack and hence important, send it now so it is not lost.
-                if(oldPendingAck.AckType != (byte)AckType.DeliveredAck) 
-                {
-                    Tracer.Debug("Sending old pending ack " + oldPendingAck + 
", new pending: " + pendingAck);
-                    this.session.Connection.Oneway(oldPendingAck);
-                } 
-                else
-                {
-                    Tracer.Debug("dropping old pending ack " + oldPendingAck + 
", new pending: " + pendingAck);
-                }
-            }
-            
-            if((0.5 * this.info.PrefetchSize) <= (this.deliveredCounter - 
this.additionalWindowSize)) 
-            {
-                this.session.Connection.Oneway(pendingAck);
-                this.pendingAck = null;
-                this.deliveredCounter = 0;
-                this.additionalWindowSize = 0;
-            }
-        }
-
-        internal void Acknowledge()
-        {
-            lock(this.dispatchedMessages)
-            {
-                // Acknowledge all messages so far.
-                MessageAck ack = 
MakeAckForAllDeliveredMessages(AckType.ConsumedAck);
-                
-                if(ack == null)
-                {
-                    return; // no msgs
-                }
-                
-                if(this.session.IsTransacted)
-                {
-                    this.session.DoStartTransaction();
-                    ack.TransactionId = 
this.session.TransactionContext.TransactionId;
-                }
-                
-                this.session.Connection.Oneway(ack);
-                this.pendingAck = null;
-                
-                // Adjust the counters
-                this.deliveredCounter = Math.Max(0, this.deliveredCounter - 
this.dispatchedMessages.Count);
-                this.additionalWindowSize = Math.Max(0, 
this.additionalWindowSize - this.dispatchedMessages.Count);
-                
-                if(!this.session.IsTransacted) 
-                {
-                    this.dispatchedMessages.Clear();
-                } 
-            }            
-        }        
-
-        private void Commit()
-        {
-            lock(this.dispatchedMessages)
-            {
-                this.dispatchedMessages.Clear();
-            }
-            
-            this.redeliveryDelay = 0;
-        }
-
-        private void Rollback()
-        {
-            lock(this.unconsumedMessages.SyncRoot)
-            {
-                lock(this.dispatchedMessages)
-                {
-                    if(this.dispatchedMessages.Count == 0)
-                    {
-                        return;
-                    }
-        
-                    // Only increase the redelivery delay after the first 
redelivery..
-                    MessageDispatch lastMd = 
this.dispatchedMessages.First.Value;
-                    int currentRedeliveryCount = 
lastMd.Message.RedeliveryCounter;
-                    
-                    redeliveryDelay = 
this.redeliveryPolicy.RedeliveryDelay(currentRedeliveryCount);
-                    
-                    MessageId firstMsgId = 
this.dispatchedMessages.Last.Value.Message.MessageId;
-
-                    foreach(MessageDispatch dispatch in 
this.dispatchedMessages)
-                    {
-                        // Allow the message to update its internal to reflect 
a Rollback.
-                        dispatch.Message.OnMessageRollback();
-                    }
-        
-                    if(this.redeliveryPolicy.MaximumRedeliveries >= 0 &&
-                       lastMd.Message.RedeliveryCounter > 
this.redeliveryPolicy.MaximumRedeliveries)
-                    {
-                        // We need to NACK the messages so that they get sent 
to the DLQ.
-                        MessageAck ack = new MessageAck();
-
-                        ack.AckType = (byte)AckType.PoisonAck;
-                        ack.ConsumerId = this.info.ConsumerId;
-                        ack.Destination = lastMd.Destination;
-                        ack.LastMessageId = lastMd.Message.MessageId;
-                        ack.MessageCount = this.dispatchedMessages.Count;      
                              
-                        ack.FirstMessageId = firstMsgId;
-
-                        this.session.Connection.Oneway(ack);
-                        
-                        // Adjust the window size.
-                        additionalWindowSize = Math.Max(0, 
this.additionalWindowSize - this.dispatchedMessages.Count);
-                        
-                        this.redeliveryDelay = 0;
-                    } 
-                    else
-                    {                        
-                        // We only send a RedeliveryAck after the first 
redelivery
-                        if(currentRedeliveryCount > 0)
-                        {
-                            MessageAck ack = new MessageAck();
-                            
-                            ack.AckType = (byte)AckType.RedeliveredAck;
-                            ack.ConsumerId = this.info.ConsumerId;
-                            ack.Destination = lastMd.Destination;
-                            ack.LastMessageId = lastMd.Message.MessageId;
-                            ack.MessageCount = this.dispatchedMessages.Count;  
                                  
-                            ack.FirstMessageId = firstMsgId;
-                            
-                            this.session.Connection.Oneway(ack);
-                        }
-        
-                        // stop the delivery of messages.
-                        this.unconsumedMessages.Stop();
-
-                        foreach(MessageDispatch dispatch in 
this.dispatchedMessages)
-                        {
-                            this.unconsumedMessages.EnqueueFirst(dispatch);
-                        }
-        
-                        if(redeliveryDelay > 0 && 
!this.unconsumedMessages.Closed)
-                        {
-                            DateTime deadline = 
DateTime.Now.AddMilliseconds(redeliveryDelay);
-                            ThreadPool.QueueUserWorkItem(this.RollbackHelper, 
deadline);
-                        } 
-                        else 
-                        {
-                            Start();
-                        }
-                    }
-                    
-                    this.deliveredCounter -= this.dispatchedMessages.Count;
-                    this.dispatchedMessages.Clear();
-                }
-            }
-
-            // Only redispatch if there's an async listener otherwise a 
synchronous
-            // consumer will pull them from the local queue.
-            if(this.listener != null) 
-            {
-                this.session.Redispatch(this.unconsumedMessages);
-            }
-        }
-        
-        private void RollbackHelper(Object arg)
-        {
-            try
-            {
-                TimeSpan waitTime = (DateTime) arg - DateTime.Now;
-
-                if(waitTime.CompareTo(TimeSpan.Zero) > 0)
-                {
-                    Thread.Sleep(waitTime);
-                }
-                
-                this.Start();
-            }            
-            catch(Exception e)
-            {
+               public void Stop()
+               {
+                       this.started.Value = false;
+                       this.unconsumedMessages.Stop();
+               }
+
+               public void ClearMessagesInProgress()
+               {
+                       // we are called from inside the transport reconnection 
logic
+                       // which involves us clearing all the connections' 
consumers
+                       // dispatch lists and clearing them
+                       // so rather than trying to grab a mutex (which could 
be already
+                       // owned by the message listener calling the send) we 
will just set
+                       // a flag so that the list can be cleared as soon as the
+                       // dispatch thread is ready to flush the dispatch list
+                       this.clearDispatchList = true;
+               }
+
+               public void DeliverAcks()
+               {
+                       MessageAck ack = null;
+
+                       if(this.deliveringAcks.CompareAndSet(false, true))
+                       {
+                               if(this.IsAutoAcknowledgeEach)
+                               {
+                                       lock(this.dispatchedMessages)
+                                       {
+                                               ack = 
MakeAckForAllDeliveredMessages(AckType.DeliveredAck);
+                                               if(ack != null)
+                                               {
+                                                       
this.dispatchedMessages.Clear();
+                                               }
+                                               else
+                                               {
+                                                       ack = this.pendingAck;
+                                                       this.pendingAck = null;
+                                               }
+                                       }
+                               }
+                               else if(pendingAck != null && 
pendingAck.AckType == (byte) AckType.ConsumedAck)
+                               {
+                                       ack = pendingAck;
+                                       pendingAck = null;
+                               }
+
+                               if(ack != null)
+                               {
+                                       MessageAck ackToSend = ack;
+
+                                       try
+                                       {
+                                               
this.session.Connection.Oneway(ackToSend);
+                                       }
+                                       catch(Exception e)
+                                       {
+                                               Tracer.DebugFormat("{0} : 
Failed to send ack, {1}", this.info.ConsumerId, e);
+                                       }
+                               }
+                               else
+                               {
+                                       this.deliveringAcks.Value = false;
+                               }
+                       }
+               }
+
+               public void Dispatch(MessageDispatch dispatch)
+               {
+                       MessageListener listener = this.listener;
+
+                       try
+                       {
+                               lock(this.unconsumedMessages.SyncRoot)
+                               {
+                                       if(this.clearDispatchList)
+                                       {
+                                               // we are reconnecting so lets 
flush the in progress messages
+                                               this.clearDispatchList = false;
+                                               this.unconsumedMessages.Clear();
+
+                                               if(this.pendingAck != null && 
this.pendingAck.AckType == (byte) AckType.DeliveredAck)
+                                               {
+                                                       // on resumption a 
pending delivered ack will be out of sync with
+                                                       // re-deliveries.
+                                                       Tracer.Debug("removing 
pending delivered ack on transport interupt: " + pendingAck);
+                                                       this.pendingAck = null;
+                                               }
+                                       }
+
+                                       if(!this.unconsumedMessages.Closed)
+                                       {
+                                               if(listener != null && 
this.unconsumedMessages.Running)
+                                               {
+                                                       ActiveMQMessage message 
= CreateActiveMQMessage(dispatch);
+
+                                                       
this.BeforeMessageIsConsumed(dispatch);
+
+                                                       try
+                                                       {
+                                                               bool expired = 
message.IsExpired();
+
+                                                               if(!expired)
+                                                               {
+                                                                       
listener(message);
+                                                               }
+
+                                                               
this.AfterMessageIsConsumed(dispatch, expired);
+                                                       }
+                                                       catch(Exception e)
+                                                       {
+                                                               
if(IsAutoAcknowledgeBatch || IsAutoAcknowledgeEach || 
this.session.IsIndividualAcknowledge)
+                                                               {
+                                                                       // 
Redeliver the message
+                                                               }
+                                                               else
+                                                               {
+                                                                       // 
Transacted or Client ack: Deliver the next message.
+                                                                       
this.AfterMessageIsConsumed(dispatch, false);
+                                                               }
+
+                                                               
Tracer.Error(this.info.ConsumerId + " Exception while processing message: " + 
e);
+                                                       }
+                                               }
+                                               else
+                                               {
+                                                       
this.unconsumedMessages.Enqueue(dispatch);
+                                               }
+                                       }
+                               }
+
+                               if(++dispatchedCount % 1000 == 0)
+                               {
+                                       dispatchedCount = 0;
+                                       Thread.Sleep(1);
+                               }
+                       }
+                       catch(Exception e)
+                       {
+                               
this.session.Connection.OnSessionException(this.session, e);
+                       }
+               }
+
+               public bool Iterate()
+               {
+                       if(this.listener != null)
+                       {
+                               MessageDispatch dispatch = 
this.unconsumedMessages.DequeueNoWait();
+                               if(dispatch != null)
+                               {
+                                       try
+                                       {
+                                               ActiveMQMessage message = 
CreateActiveMQMessage(dispatch);
+                                               
BeforeMessageIsConsumed(dispatch);
+                                               listener(message);
+                                               
AfterMessageIsConsumed(dispatch, false);
+                                       }
+                                       catch(NMSException e)
+                                       {
+                                               
this.session.Connection.OnSessionException(this.session, e);
+                                       }
+
+                                       return true;
+                               }
+                       }
+
+                       return false;
+               }
+
+               /// <summary>
+               /// Used to get an enqueued message from the unconsumedMessages 
list. The
+               /// amount of time this method blocks is based on the timeout 
value.  if
+               /// timeout == Timeout.Infinite then it blocks until a message 
is received. 
+               /// if timeout == 0 then it it tries to not block at all, it 
returns a 
+               /// message if it is available if timeout > 0 then it blocks up 
to timeout 
+               /// amount of time.  Expired messages will consumed by this 
method.
+               /// </summary>
+               /// <param name="timeout">
+               /// A <see cref="System.TimeSpan"/>
+               /// </param>
+               /// <returns>
+               /// A <see cref="MessageDispatch"/>
+               /// </returns>
+               private MessageDispatch Dequeue(TimeSpan timeout)
+               {
+                       DateTime deadline = DateTime.Now;
+
+                       if(timeout > TimeSpan.Zero)
+                       {
+                               deadline += timeout;
+                       }
+
+                       while(true)
+                       {
+                               MessageDispatch dispatch = 
this.unconsumedMessages.Dequeue(timeout);
+
+                               // Grab a single date/time for calculations to 
avoid timing errors.
+                               DateTime dispatchTime = DateTime.Now;
+
+                               if(dispatch == null)
+                               {
+                                       if(timeout > TimeSpan.Zero && 
!this.unconsumedMessages.Closed)
+                                       {
+                                               if(dispatchTime > deadline)
+                                               {
+                                                       // Out of time.
+                                                       timeout = TimeSpan.Zero;
+                                               }
+                                               else
+                                               {
+                                                       // Adjust the timeout 
to the remaining time.
+                                                       timeout = deadline - 
dispatchTime;
+                                               }
+                                       }
+                                       else
+                                       {
+                                               return null;
+                                       }
+                               }
+                               else if(dispatch.Message == null)
+                               {
+                                       return null;
+                               }
+                               else if(dispatch.Message.IsExpired())
+                               {
+                                       Tracer.DebugFormat("{0} received 
expired message: {1}", info.ConsumerId, dispatch.Message.MessageId);
+
+                                       BeforeMessageIsConsumed(dispatch);
+                                       AfterMessageIsConsumed(dispatch, true);
+                                       // Refresh the dispatch time
+                                       dispatchTime = DateTime.Now;
+
+                                       if(timeout > TimeSpan.Zero && 
!this.unconsumedMessages.Closed)
+                                       {
+                                               if(dispatchTime > deadline)
+                                               {
+                                                       // Out of time.
+                                                       timeout = TimeSpan.Zero;
+                                               }
+                                               else
+                                               {
+                                                       // Adjust the timeout 
to the remaining time.
+                                                       timeout = deadline - 
dispatchTime;
+                                               }
+                                       }
+                               }
+                               else
+                               {
+                                       return dispatch;
+                               }
+                       }
+               }
+
+               public void BeforeMessageIsConsumed(MessageDispatch dispatch)
+               {
+                       this.lastDeliveredSequenceId = 
dispatch.Message.MessageId.BrokerSequenceId;
+
+                       if(!IsAutoAcknowledgeBatch)
+                       {
+                               lock(this.dispatchedMessages)
+                               {
+                                       
this.dispatchedMessages.AddFirst(dispatch);
+                               }
+
+                               if(this.session.IsTransacted)
+                               {
+                                       this.AckLater(dispatch, 
AckType.DeliveredAck);
+                               }
+                       }
+               }
+
+               public void AfterMessageIsConsumed(MessageDispatch dispatch, 
bool expired)
+               {
+                       if(this.unconsumedMessages.Closed)
+                       {
+                               return;
+                       }
+
+                       if(expired == true)
+                       {
+                               lock(this.dispatchedMessages)
+                               {
+                                       
this.dispatchedMessages.Remove(dispatch);
+                               }
+
+                               AckLater(dispatch, AckType.DeliveredAck);
+                       }
+                       else
+                       {
+                               if(this.session.IsTransacted)
+                               {
+                                       // Do nothing.
+                               }
+                               else if(this.IsAutoAcknowledgeEach)
+                               {
+                                       
if(this.deliveringAcks.CompareAndSet(false, true))
+                                       {
+                                               lock(this.dispatchedMessages)
+                                               {
+                                                       
if(this.dispatchedMessages.Count != 0)
+                                                       {
+                                                               MessageAck ack 
= MakeAckForAllDeliveredMessages(AckType.ConsumedAck);
+                                                               if(ack != null)
+                                                               {
+                                                                       
this.dispatchedMessages.Clear();
+                                                                       
this.session.Connection.Oneway(ack);
+                                                               }
+                                                       }
+                                               }
+                                               this.deliveringAcks.Value = 
false;
+                                       }
+                               }
+                               else if(this.IsAutoAcknowledgeBatch)
+                               {
+                                       AckLater(dispatch, AckType.ConsumedAck);
+                               }
+                               else if(this.session.IsClientAcknowledge || 
this.session.IsIndividualAcknowledge)
+                               {
+                                       AckLater(dispatch, 
AckType.DeliveredAck);
+                               }
+                               else
+                               {
+                                       throw new NMSException("Invalid session 
state.");
+                               }
+                       }
+               }
+
+               private MessageAck MakeAckForAllDeliveredMessages(AckType type)
+               {
+                       lock(this.dispatchedMessages)
+                       {
+                               if(this.dispatchedMessages.Count == 0)
+                               {
+                                       return null;
+                               }
+
+                               MessageDispatch dispatch = 
this.dispatchedMessages.First.Value;
+                               MessageAck ack = new MessageAck();
+
+                               ack.AckType = (byte) type;
+                               ack.ConsumerId = this.info.ConsumerId;
+                               ack.Destination = dispatch.Destination;
+                               ack.LastMessageId = dispatch.Message.MessageId;
+                               ack.MessageCount = 
this.dispatchedMessages.Count;
+                               ack.FirstMessageId = 
this.dispatchedMessages.Last.Value.Message.MessageId;
+
+                               return ack;
+                       }
+               }
+
+               private void AckLater(MessageDispatch dispatch, AckType type)
+               {
+                       // Don't acknowledge now, but we may need to let the 
broker know the
+                       // consumer got the message to expand the pre-fetch 
window
+                       if(this.session.IsTransacted)
+                       {
+                               this.session.DoStartTransaction();
+
+                               if(!synchronizationRegistered)
+                               {
+                                       this.synchronizationRegistered = true;
+                                       
this.session.TransactionContext.AddSynchronization(new 
MessageConsumerSynchronization(this));
+                               }
+                       }
+
+                       this.deliveredCounter++;
+
+                       MessageAck oldPendingAck = pendingAck;
+
+                       pendingAck = new MessageAck();
+                       pendingAck.AckType = (byte) type;
+                       pendingAck.ConsumerId = this.info.ConsumerId;
+                       pendingAck.Destination = dispatch.Destination;
+                       pendingAck.LastMessageId = dispatch.Message.MessageId;
+                       pendingAck.MessageCount = deliveredCounter;
+
+                       if(this.session.IsTransacted && 
this.session.TransactionContext.InTransaction)
+                       {
+                               pendingAck.TransactionId = 
this.session.TransactionContext.TransactionId;
+                       }
+
+                       if(oldPendingAck == null)
+                       {
+                               pendingAck.FirstMessageId = 
pendingAck.LastMessageId;
+                       }
+                       else if(oldPendingAck.AckType == pendingAck.AckType)
+                       {
+                               pendingAck.FirstMessageId = 
oldPendingAck.FirstMessageId;
+                       }
+                       else
+                       {
+                               // old pending ack being superseded by ack of 
another type, if is is not a delivered
+                               // ack and hence important, send it now so it 
is not lost.
+                               if(oldPendingAck.AckType != (byte) 
AckType.DeliveredAck)
+                               {
+                                       Tracer.Debug("Sending old pending ack " 
+ oldPendingAck + ", new pending: " + pendingAck);
+                                       
this.session.Connection.Oneway(oldPendingAck);
+                               }
+                               else
+                               {
+                                       Tracer.Debug("dropping old pending ack 
" + oldPendingAck + ", new pending: " + pendingAck);
+                               }
+                       }
+
+                       if((0.5 * this.info.PrefetchSize) <= 
(this.deliveredCounter - this.additionalWindowSize))
+                       {
+                               this.session.Connection.Oneway(pendingAck);
+                               this.pendingAck = null;
+                               this.deliveredCounter = 0;
+                               this.additionalWindowSize = 0;
+                       }
+               }
+
+               internal void Acknowledge()
+               {
+                       lock(this.dispatchedMessages)
+                       {
+                               // Acknowledge all messages so far.
+                               MessageAck ack = 
MakeAckForAllDeliveredMessages(AckType.ConsumedAck);
+
+                               if(ack == null)
+                               {
+                                       return; // no msgs
+                               }
+
+                               if(this.session.IsTransacted)
+                               {
+                                       this.session.DoStartTransaction();
+                                       ack.TransactionId = 
this.session.TransactionContext.TransactionId;
+                               }
+
+                               this.session.Connection.Oneway(ack);
+                               this.pendingAck = null;
+
+                               // Adjust the counters
+                               this.deliveredCounter = Math.Max(0, 
this.deliveredCounter - this.dispatchedMessages.Count);
+                               this.additionalWindowSize = Math.Max(0, 
this.additionalWindowSize - this.dispatchedMessages.Count);
+
+                               if(!this.session.IsTransacted)
+                               {
+                                       this.dispatchedMessages.Clear();
+                               }
+                       }
+               }
+
+               private void Commit()
+               {
+                       lock(this.dispatchedMessages)
+                       {
+                               this.dispatchedMessages.Clear();
+                       }
+
+                       this.redeliveryDelay = 0;
+               }
+
+               private void Rollback()
+               {
+                       lock(this.unconsumedMessages.SyncRoot)
+                       {
+                               lock(this.dispatchedMessages)
+                               {
+                                       if(this.dispatchedMessages.Count == 0)
+                                       {
+                                               return;
+                                       }
+
+                                       // Only increase the redelivery delay 
after the first redelivery..
+                                       MessageDispatch lastMd = 
this.dispatchedMessages.First.Value;
+                                       int currentRedeliveryCount = 
lastMd.Message.RedeliveryCounter;
+
+                                       redeliveryDelay = 
this.redeliveryPolicy.RedeliveryDelay(currentRedeliveryCount);
+
+                                       MessageId firstMsgId = 
this.dispatchedMessages.Last.Value.Message.MessageId;
+
+                                       foreach(MessageDispatch dispatch in 
this.dispatchedMessages)
+                                       {
+                                               // Allow the message to update 
its internal to reflect a Rollback.
+                                               
dispatch.Message.OnMessageRollback();
+                                       }
+
+                                       
if(this.redeliveryPolicy.MaximumRedeliveries >= 0 &&
+                                          lastMd.Message.RedeliveryCounter > 
this.redeliveryPolicy.MaximumRedeliveries)
+                                       {
+                                               // We need to NACK the messages 
so that they get sent to the DLQ.
+                                               MessageAck ack = new 
MessageAck();
+
+                                               ack.AckType = (byte) 
AckType.PoisonAck;
+                                               ack.ConsumerId = 
this.info.ConsumerId;
+                                               ack.Destination = 
lastMd.Destination;
+                                               ack.LastMessageId = 
lastMd.Message.MessageId;
+                                               ack.MessageCount = 
this.dispatchedMessages.Count;
+                                               ack.FirstMessageId = firstMsgId;
+
+                                               
this.session.Connection.Oneway(ack);
+
+                                               // Adjust the window size.
+                                               additionalWindowSize = 
Math.Max(0, this.additionalWindowSize - this.dispatchedMessages.Count);
+
+                                               this.redeliveryDelay = 0;
+                                       }
+                                       else
+                                       {
+                                               // We only send a RedeliveryAck 
after the first redelivery
+                                               if(currentRedeliveryCount > 0)
+                                               {
+                                                       MessageAck ack = new 
MessageAck();
+
+                                                       ack.AckType = (byte) 
AckType.RedeliveredAck;
+                                                       ack.ConsumerId = 
this.info.ConsumerId;
+                                                       ack.Destination = 
lastMd.Destination;
+                                                       ack.LastMessageId = 
lastMd.Message.MessageId;
+                                                       ack.MessageCount = 
this.dispatchedMessages.Count;
+                                                       ack.FirstMessageId = 
firstMsgId;
+
+                                                       
this.session.Connection.Oneway(ack);
+                                               }
+
+                                               // stop the delivery of 
messages.
+                                               this.unconsumedMessages.Stop();
+
+                                               foreach(MessageDispatch 
dispatch in this.dispatchedMessages)
+                                               {
+                                                       
this.unconsumedMessages.EnqueueFirst(dispatch);
+                                               }
+
+                                               if(redeliveryDelay > 0 && 
!this.unconsumedMessages.Closed)
+                                               {
+                                                       DateTime deadline = 
DateTime.Now.AddMilliseconds(redeliveryDelay);
+                                                       
ThreadPool.QueueUserWorkItem(this.RollbackHelper, deadline);
+                                               }
+                                               else
+                                               {
+                                                       Start();
+                                               }
+                                       }
+
+                                       this.deliveredCounter -= 
this.dispatchedMessages.Count;
+                                       this.dispatchedMessages.Clear();
+                               }
+                       }
+
+                       // Only redispatch if there's an async listener 
otherwise a synchronous
+                       // consumer will pull them from the local queue.
+                       if(this.listener != null)
+                       {
+                               
this.session.Redispatch(this.unconsumedMessages);
+                       }
+               }
+
+               private void RollbackHelper(Object arg)
+               {
+                       try
+                       {
+                               TimeSpan waitTime = (DateTime) arg - 
DateTime.Now;
+
+                               if(waitTime.CompareTo(TimeSpan.Zero) > 0)
+                               {
+                                       Thread.Sleep(waitTime);
+                               }
+
+                               this.Start();
+                       }
+                       catch(Exception e)
+                       {
                                if(!this.unconsumedMessages.Closed)
-                               {       
-                       
this.session.Connection.OnSessionException(this.session, e);
+                               {
+                                       
this.session.Connection.OnSessionException(this.session, e);
                                }
-            }
-        }
+                       }
+               }
+
+               private ActiveMQMessage CreateActiveMQMessage(MessageDispatch 
dispatch)
+               {
+                       ActiveMQMessage message = dispatch.Message.Clone() as 
ActiveMQMessage;
 
-        private ActiveMQMessage CreateActiveMQMessage(MessageDispatch 
dispatch) 
-        {
-            ActiveMQMessage message = dispatch.Message.Clone() as 
ActiveMQMessage;
-                       
                        message.Connection = this.session.Connection;
-            
-            if(this.session.IsClientAcknowledge)
-            {
-                message.Acknowledger += new 
AcknowledgeHandler(DoClientAcknowledge);
-            }
-            else if(this.session.IsIndividualAcknowledge)
-            {
-                message.Acknowledger += new 
AcknowledgeHandler(DoIndividualAcknowledge);
-            }
-            else
-            {
-                message.Acknowledger += new 
AcknowledgeHandler(DoNothingAcknowledge);                
-            }
-            
-            return message;
-        }
-        
-        private void CheckClosed()
-        {
-            if(this.unconsumedMessages.Closed)
-            {
-                throw new NMSException("The Consumer has been Closed");
-            }
-        }
-
-        private void CheckMessageListener()
-        {
-            if(this.listener != null)
-            {
-                throw new NMSException("Cannot set Async listeners on 
Consumers with a prefetch limit of zero");
-            }
-        }
-
-        private bool IsAutoAcknowledgeEach
-        {
-            get 
-            { 
-                return this.session.IsAutoAcknowledge ||
-                       (this.session.IsDupsOkAcknowledge && 
this.info.Destination.IsQueue); 
-            }
-        }
-    
-        private bool IsAutoAcknowledgeBatch 
-        {
-            get { return this.session.IsDupsOkAcknowledge && 
!this.info.Destination.IsQueue; }
-        }
-
-        #region Nested ISyncronization Types
-        
-        class MessageConsumerSynchronization : ISynchronization
-        {
-            private readonly MessageConsumer consumer;
-    
-            public MessageConsumerSynchronization(MessageConsumer consumer)
-            {
-                this.consumer = consumer;
-            }
-    
-            public void BeforeEnd()
-            {
-                this.consumer.Acknowledge();
-                this.consumer.synchronizationRegistered = false;
-            }
-    
-            public void AfterCommit()
-            {
-                this.consumer.Commit();
-                this.consumer.synchronizationRegistered = false;
-            }
-    
-            public void AfterRollback()
-            {
-                this.consumer.Rollback();
-                this.consumer.synchronizationRegistered = false;
-            }
-        }
-    
-        class ConsumerCloseSynchronization : ISynchronization
-        {
-            private readonly MessageConsumer consumer;
-    
-            public ConsumerCloseSynchronization(MessageConsumer consumer)
-            {
-                this.consumer = consumer;
-            }
-    
-            public void BeforeEnd()
-            {
-            }
-    
-            public void AfterCommit()
-            {
-                this.consumer.DoClose();
-            }
-    
-            public void AfterRollback()
-            {
-                this.consumer.DoClose();
-            }
-        }
 
-        #endregion
-    }
+                       if(this.session.IsClientAcknowledge)
+                       {
+                               message.Acknowledger += new 
AcknowledgeHandler(DoClientAcknowledge);
+                       }
+                       else if(this.session.IsIndividualAcknowledge)
+                       {
+                               message.Acknowledger += new 
AcknowledgeHandler(DoIndividualAcknowledge);
+                       }
+                       else
+                       {
+                               message.Acknowledger += new 
AcknowledgeHandler(DoNothingAcknowledge);
+                       }
+
+                       return message;
+               }
+
+               private void CheckClosed()
+               {
+                       if(this.unconsumedMessages.Closed)
+                       {
+                               throw new NMSException("The Consumer has been 
Closed");
+                       }
+               }
+
+               private void CheckMessageListener()
+               {
+                       if(this.listener != null)
+                       {
+                               throw new NMSException("Cannot set Async 
listeners on Consumers with a prefetch limit of zero");
+                       }
+               }
+
+               private bool IsAutoAcknowledgeEach
+               {
+                       get
+                       {
+                               return this.session.IsAutoAcknowledge ||
+                                          (this.session.IsDupsOkAcknowledge && 
this.info.Destination.IsQueue);
+                       }
+               }
+
+               private bool IsAutoAcknowledgeBatch
+               {
+                       get { return this.session.IsDupsOkAcknowledge && 
!this.info.Destination.IsQueue; }
+               }
+
+               #region Nested ISyncronization Types
+
+               class MessageConsumerSynchronization : ISynchronization
+               {
+                       private readonly MessageConsumer consumer;
+
+                       public MessageConsumerSynchronization(MessageConsumer 
consumer)
+                       {
+                               this.consumer = consumer;
+                       }
+
+                       public void BeforeEnd()
+                       {
+                               this.consumer.Acknowledge();
+                               this.consumer.synchronizationRegistered = false;
+                       }
+
+                       public void AfterCommit()
+                       {
+                               this.consumer.Commit();
+                               this.consumer.synchronizationRegistered = false;
+                       }
+
+                       public void AfterRollback()
+                       {
+                               this.consumer.Rollback();
+                               this.consumer.synchronizationRegistered = false;
+                       }
+               }
+
+               class ConsumerCloseSynchronization : ISynchronization
+               {
+                       private readonly MessageConsumer consumer;
+
+                       public ConsumerCloseSynchronization(MessageConsumer 
consumer)
+                       {
+                               this.consumer = consumer;
+                       }
+
+                       public void BeforeEnd()
+                       {
+                       }
+
+                       public void AfterCommit()
+                       {
+                               this.consumer.DoClose();
+                       }
+
+                       public void AfterRollback()
+                       {
+                               this.consumer.DoClose();
+                       }
+               }
+
+               #endregion
+       }
 }


Reply via email to