Author: robbie
Date: Tue Dec  7 12:22:54 2010
New Revision: 1042997

URL: http://svn.apache.org/viewvc?rev=1042997&view=rev
Log:
QPID-2971 - onMessage/recieve + recover/rollback handling of Max Delivery Count 
for 0-8/0-9 consumers

Added:
    
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/DeliveryCountTracker.java
    
qpid/branches/0.5.x-dev/qpid/java/client/src/test/java/org/apache/qpid/client/DeliveryCountTrackerTest.java
Modified:
    
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
    
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
    
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java

Modified: 
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1042997&r1=1042996&r2=1042997&view=diff
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 (original)
+++ 
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 Tue Dec  7 12:22:54 2010
@@ -314,6 +314,9 @@ public abstract class AMQSession<C exten
     /** All the delivered message tags */
     protected ConcurrentLinkedQueue<Long> _deliveredMessageTags = new 
ConcurrentLinkedQueue<Long>();
 
+    /** The last asynchronously delivered auto-ack message tag */
+    protected Long _lastAsyncAutoAckDeliveryTag = null;
+    
     /** Holds the dispatcher thread for this session. */
     protected Dispatcher _dispatcher;
 
@@ -821,7 +824,8 @@ public abstract class AMQSession<C exten
                                                          "Forced rollback");
             }
 
-
+            ArrayList<Long> ackedMessageTags = new ArrayList<Long>();
+            
             // Acknowledge all delivered messages
             while (true)
             {
@@ -832,10 +836,17 @@ public abstract class AMQSession<C exten
                 }
 
                 acknowledgeMessage(tag, false);
+                ackedMessageTags.add(tag);
             }
             // Commits outstanding messages and acknowledgments
             sendCommit();
             markClean();
+            
+            //remove MaxRedelivery info for the commited deliveryTags to 
enhance retention of other message tags
+            for(C consumer : _consumers.values())
+            {
+                
consumer.removeDeliveryCountRecordsForMessages(ackedMessageTags);
+            }
         }
         catch (AMQException e)
         {
@@ -1631,6 +1642,8 @@ public abstract class AMQSession<C exten
                 _dispatcher.rollback();
             }
 
+            enforceMaxDeliveryCountDuringRecover();
+            
             sendRecover();
 
             markClean();
@@ -1650,6 +1663,76 @@ public abstract class AMQSession<C exten
         }
     }
 
+    private void enforceMaxDeliveryCountDuringRecover()
+    {
+        ArrayList<C> consumersToCheck = new ArrayList<C>(_consumers.values());
+        Iterator<C> iter = consumersToCheck.iterator();
+        if(!iter.hasNext())
+        {
+            return;
+        }
+
+        //remove any consumers not enforcing MaxDelivery
+        while(iter.hasNext())
+        {
+            C con = iter.next();
+            if(!con.isMaxDeliveryCountEnforced())
+            {
+                iter.remove();
+            }
+        }
+        
+        if (consumersToCheck.size() > 0)
+        {
+            //reject(false) any messages we don't want returned again 
+            switch(_acknowledgeMode)
+            {
+                case Session.CLIENT_ACKNOWLEDGE:
+                    for(long tag : _unacknowledgedMessageTags)
+                    {
+                        for(C consumer : consumersToCheck)
+                        {
+                            if(!consumer.shouldRequeueMessage(tag))
+                            {
+                                //consumer said we should not requeue the 
message, do reject(false)
+                                rejectMessage(tag, false);
+                                
+                                //explicitly remove records for message, we 
know they wont be used again
+                                
consumer.removeDeliveryCountRecordsForMessage(tag);
+                                //no need to check other consumers now
+                                break;
+                            }
+                        }
+                    }
+                    break;
+                case Session.DUPS_OK_ACKNOWLEDGE:
+                    //fall through
+                case Session.AUTO_ACKNOWLEDGE:
+                    //check the last message asynchronously delivered via 
auto-ack
+                    Long tag = getLastAsyncAutoAckDeliveryTag();
+                    clearLastAsyncAutoAckDeliveryTag();
+                    
+                    if(tag != null)
+                    {
+                        for(C consumer : consumersToCheck)
+                        {
+                            if(consumer.isMessageListenerSet() && 
!consumer.shouldRequeueMessage(tag))
+                            {
+                                //consumer said we should not requeue the 
message, do reject(false)
+                                rejectMessage(tag, false);
+
+                                //explicitly remove records for message, we 
know they wont be used again
+                                
consumer.removeDeliveryCountRecordsForMessage(tag);
+                                //no need to check other consumers now
+                                break;
+                            }
+                        }
+                    }
+                    break;
+            }
+        }
+    }
+
     protected abstract void sendRecover() throws AMQException, 
FailoverException;
 
     public void rejectMessage(UnprocessedMessage message, boolean requeue)
@@ -2046,7 +2129,7 @@ public abstract class AMQSession<C exten
 
     void syncDispatchQueue()
     {
-        if (Thread.currentThread() == _dispatcherThread)
+        if (isDispatcherThread())
         {
             while (!_closed.get() && !_queue.isEmpty())
             {
@@ -2154,7 +2237,7 @@ public abstract class AMQSession<C exten
     void startDispatcherIfNecessary()
     {
         //If we are the dispatcher then we don't need to check we are started
-        if (Thread.currentThread() == _dispatcherThread)
+        if (isDispatcherThread())
         {
             return;
         }
@@ -2762,11 +2845,6 @@ public abstract class AMQSession<C exten
         _producers.put(new Long(producerId), producer);
     }
 
-    private void rejectAllMessages(boolean requeue)
-    {
-        rejectMessagesForConsumerTag(0, requeue, true);
-    }
-
     /**
      * @param consumerTag The consumerTag to prune from queue or all if null
      * @param requeue     Should the removed messages be requeued (or 
discarded. Possibly to DLQ)
@@ -3307,4 +3385,24 @@ public abstract class AMQSession<C exten
     {
         return _closing.get()|| _connection.isClosing();
     }
+
+    public void setLastAsyncAutoAckDeliveryTag(Long tag)
+    {
+        _lastAsyncAutoAckDeliveryTag = tag;
+    }
+    
+    public Long getLastAsyncAutoAckDeliveryTag()
+    {
+        return _lastAsyncAutoAckDeliveryTag;
+    }
+    
+    public void clearLastAsyncAutoAckDeliveryTag()
+    {
+        _lastAsyncAutoAckDeliveryTag = null;
+    }
+    
+    protected boolean isDispatcherThread()
+    {
+        return Thread.currentThread() == _dispatcherThread;
+    }
 }

Modified: 
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=1042997&r1=1042996&r2=1042997&view=diff
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
 (original)
+++ 
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
 Tue Dec  7 12:22:54 2010
@@ -20,7 +20,6 @@
  */
 package org.apache.qpid.client;
 
-
 import java.util.Map;
 
 import javax.jms.Destination;
@@ -241,17 +240,38 @@ public final class AMQSession_0_8 extend
                 break;
             }
 
-            rejectMessage(tag, true);
+            boolean shouldRequeue = true;
+            BasicMessageConsumer_0_8 rejectingConsumer = null;
+
+            for(BasicMessageConsumer_0_8 consumer : _consumers.values())
+            {
+                shouldRequeue = consumer.shouldRequeueMessage(tag);
+                if(!shouldRequeue)
+                {
+                    rejectingConsumer = consumer;
+
+                    //no need to consult other consumers now, it is rejected.
+                    break;
+                }
+            }
+
+            rejectMessage(tag, shouldRequeue);
+            if(!shouldRequeue)
+            {
+                //explicitly remove records for message, we know they wont be 
used again
+                rejectingConsumer.removeDeliveryCountRecordsForMessage(tag);
+            }
         }
     }
 
     public void rejectMessage(long deliveryTag, boolean requeue)
     {
-        if ((_acknowledgeMode == CLIENT_ACKNOWLEDGE) || (_acknowledgeMode == 
SESSION_TRANSACTED))
+        if ((_acknowledgeMode == CLIENT_ACKNOWLEDGE) || (_acknowledgeMode == 
SESSION_TRANSACTED) ||
+              ((_acknowledgeMode == AUTO_ACKNOWLEDGE || _acknowledgeMode == 
DUPS_OK_ACKNOWLEDGE ) && hasMessageListeners()))
         {
             if (_logger.isDebugEnabled())
             {
-                _logger.debug("Rejecting delivery tag:" + deliveryTag + 
":SessionHC:" + this.hashCode());
+                _logger.debug("Rejecting delivery tag:" + deliveryTag + 
":ReQueue:" + requeue + ":SessionHC:" + this.hashCode());
             }
 
             BasicRejectBody body = 
getMethodRegistry().createBasicRejectBody(deliveryTag, requeue);

Modified: 
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=1042997&r1=1042996&r2=1042997&view=diff
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
 (original)
+++ 
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
 Tue Dec  7 12:22:54 2010
@@ -33,15 +33,11 @@ import org.slf4j.LoggerFactory;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageListener;
+
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
-import java.util.SortedSet;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.TreeSet;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -116,29 +112,11 @@ public abstract class BasicMessageConsum
      */
     protected final int _acknowledgeMode;
 
-    /**
-     * Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode
-     */
-    private int _outstanding;
-
-    /**
-     * Switch to enable sending of acknowledgements when using 
DUPS_OK_ACKNOWLEDGE mode. Enabled when _outstannding
-     * number of msgs >= _prefetchHigh and disabled at < _prefetchLow
-     */
-    private boolean _dups_ok_acknowledge_send;
-
-    /**
-     * List of tags delievered, The last of which which should be acknowledged 
on commit in transaction mode.
-     */
-    private ConcurrentLinkedQueue<Long> _receivedDeliveryTags = new 
ConcurrentLinkedQueue<Long>();
-
-    /** The last tag that was "multiple" acknowledged on this session (if 
transacted) */
-    private long _lastAcked;
-
-    /** set of tags which have previously been acked; but not part of the 
multiple ack (transacted mode only) */
-    private final SortedSet<Long> _previouslyAcked = new TreeSet<Long>();
+    private int _idMapSize = 100;//TODO: set by configuration
+    private int _maxDeliveryAttempts = 3; //TODO: set by configuration
+    private boolean _maxRedeliverEnabled = true;//TODO set based on above 
config
 
-    private final Object _commitLock = new Object();
+    final DeliveryCountTracker _tracker = new 
DeliveryCountTracker(_idMapSize);//TODO
 
     /**
      * The thread that was used to call receive(). This is important for being 
able to interrupt that thread if a
@@ -757,19 +735,69 @@ public abstract class BasicMessageConsum
         }
     }
 
+    private void recordMessageID(AbstractJMSMessage message)
+    {
+        if(!isMaxDeliveryCountEnforced())
+        {
+            return;
+        }
+        
+        String msgId = null;
+
+        try
+        {
+            msgId = message.getJMSMessageID();
+            if(msgId != null)
+            {
+                _tracker.recordMessage(msgId, message.getDeliveryTag());
+            }
+        }
+        catch (JMSException e)
+        {
+            _logger.warn("Exception while retrieving JMSMessageID from 
message" +
+                    " with deliveryTag '" + message.getDeliveryTag() + "': " + 
e, e);
+        }
+    }
+
+    protected boolean shouldRequeueMessage(long deliveryTag)
+    {
+        if(!isMaxDeliveryCountEnforced())
+        {
+            return true;
+        }
+
+        int count = _tracker.getDeliveryCount(deliveryTag);
+
+        boolean reQueue = count  < _maxDeliveryAttempts;
+
+        return reQueue;
+    }
+
     void preDeliver(AbstractJMSMessage msg)
     {
         switch (_acknowledgeMode)
         {
+            case Session.DUPS_OK_ACKNOWLEDGE:
+                //fall through
+            case Session.AUTO_ACKNOWLEDGE:
+                if(isMessageListenerSet())
+                {
+                    //keep track of JMSMessageIDs handed to the client
+                    recordMessageID(msg);
 
+                    
_session.setLastAsyncAutoAckDeliveryTag(msg.getDeliveryTag());
+                }
+                break;
             case Session.PRE_ACKNOWLEDGE:
                 _session.acknowledgeMessage(msg.getDeliveryTag(), false);
                 break;
-
             case Session.CLIENT_ACKNOWLEDGE:
                 // we set the session so that when the user calls 
acknowledge() it can call the method on session
                 // to send out the appropriate frame
                 msg.setAMQSession(_session);
+
+                //keep track of JMSMessageIDs handed to the client
+                recordMessageID(msg);
                 break;
             case Session.SESSION_TRANSACTED:
                 if (isNoConsume())
@@ -780,18 +808,20 @@ public abstract class BasicMessageConsum
                 {
                     _session.addDeliveredMessage(msg.getDeliveryTag());
                     _session.markDirty();
+
+                    //keep track of JMSMessageIDs handed to the client
+                    recordMessageID(msg);
                 }
 
                 break;
         }
 
     }
-
+   
     void postDeliver(AbstractJMSMessage msg) throws JMSException
     {
         switch (_acknowledgeMode)
         {
-
             case Session.CLIENT_ACKNOWLEDGE:
                 if (isNoConsume())
                 {
@@ -799,101 +829,26 @@ public abstract class BasicMessageConsum
                 }
                 _session.markDirty();
                 break;
-
             case Session.DUPS_OK_ACKNOWLEDGE:
+                //fall through
             case Session.AUTO_ACKNOWLEDGE:
                 // we do not auto ack a message if the application code called 
recover()
                 if (!_session.isInRecovery())
                 {
                     _session.acknowledgeMessage(msg.getDeliveryTag(), false);
-                }
 
-                break;
-        }
-    }
-
-
-    /**
-     * Acknowledge up to last message delivered (if any). Used when commiting.
-     *
-     * @return the lastDeliveryTag to acknowledge
-     */
-    Long getLastDelivered()
-    {
-        if (!_receivedDeliveryTags.isEmpty())
-        {
-            Long lastDeliveryTag = _receivedDeliveryTags.poll();
-
-            while (!_receivedDeliveryTags.isEmpty())
-            {
-                lastDeliveryTag = _receivedDeliveryTags.poll();
-            }
-
-            assert _receivedDeliveryTags.isEmpty();
-
-            return lastDeliveryTag;
-        }
-
-        return null;
-    }
-
-    /**
-     * Acknowledge up to last message delivered (if any). Used when commiting.
-     */
-    void acknowledgeDelivered()
-    {
-        synchronized(_commitLock)
-        {
-            ArrayList<Long> tagsToAck = new ArrayList<Long>();
-
-            while (!_receivedDeliveryTags.isEmpty())
-            {
-                tagsToAck.add(_receivedDeliveryTags.poll());
-            }
-
-            Collections.sort(tagsToAck);
-
-            long prevAcked = _lastAcked;
-            long oldAckPoint = -1;
-
-            while(oldAckPoint != prevAcked)
-            {
-                oldAckPoint = prevAcked;
-
-                Iterator<Long> tagsToAckIterator = tagsToAck.iterator();
-
-                while(tagsToAckIterator.hasNext() && tagsToAckIterator.next() 
== prevAcked+1)
-                {
-                    tagsToAckIterator.remove();
-                    prevAcked++;
-                }
+                    if(isMessageListenerSet())
+                    {
+                        _session.clearLastAsyncAutoAckDeliveryTag();
 
-                Iterator<Long> previousAckIterator = 
_previouslyAcked.iterator();
-                while(previousAckIterator.hasNext() && 
previousAckIterator.next() == prevAcked+1)
-                {
-                    previousAckIterator.remove();
-                    prevAcked++;
+                        //explicitly remove records for message, we know they 
wont be used again
+                        
removeDeliveryCountRecordsForMessage(msg.getDeliveryTag());
+                    }
                 }
-
-            }
-            if(prevAcked != _lastAcked)
-            {
-                _session.acknowledgeMessage(prevAcked, true);
-                _lastAcked = prevAcked;
-            }
-
-            Iterator<Long> tagsToAckIterator = tagsToAck.iterator();
-
-            while(tagsToAckIterator.hasNext())
-            {
-                Long tag = tagsToAckIterator.next();
-                _session.acknowledgeMessage(tag, false);
-                _previouslyAcked.add(tag);
-            }
+                break;
         }
     }
 
-
     void notifyError(Throwable cause)
     {
         // synchronized (_closed)
@@ -1019,7 +974,7 @@ public abstract class BasicMessageConsum
                     iterator.remove();
                     removed = true;
                 }
-                }
+            }
 
             if (removed && (initialSize == _synchronousQueue.size()))
             {
@@ -1072,4 +1027,28 @@ public abstract class BasicMessageConsum
 
     public void failedOverPost() {}
 
+    public boolean isMaxDeliveryCountEnforced()
+    {
+        return _maxRedeliverEnabled;
+    }
+
+    public int getMaxDeliveryAttempts()
+    {
+        return _maxDeliveryAttempts;
+    }
+    public void removeDeliveryCountRecordsForMessage(long deliveryTag)
+    {
+        if(isMaxDeliveryCountEnforced())
+        {
+            _tracker.removeRecordsForMessage(deliveryTag);
+        }
+    }
+
+    public void removeDeliveryCountRecordsForMessages(List<Long> deliveryTags)
+    {
+        if(isMaxDeliveryCountEnforced())
+        {
+            _tracker.removeRecordsForMessages(deliveryTags);
+        }
+    }
 }

Modified: 
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=1042997&r1=1042996&r2=1042997&view=diff
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
 (original)
+++ 
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
 Tue Dec  7 12:22:54 2010
@@ -457,4 +457,11 @@ public class BasicMessageConsumer_0_10 e
             clearReceiveQueue();
         }
     }
+    
+    @Override
+    public boolean isMaxDeliveryCountEnforced()
+    {
+        //TODO Implement for 0-10 consumers.
+        return false;
+    }
 }

Added: 
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/DeliveryCountTracker.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/DeliveryCountTracker.java?rev=1042997&view=auto
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/DeliveryCountTracker.java
 (added)
+++ 
qpid/branches/0.5.x-dev/qpid/java/client/src/main/java/org/apache/qpid/client/DeliveryCountTracker.java
 Tue Dec  7 12:22:54 2010
@@ -0,0 +1,196 @@
+package org.apache.qpid.client;
+
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.collections.BidiMap;
+import org.apache.commons.collections.bidimap.TreeBidiMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DeliveryCountTracker
+{
+    private static final Logger _logger = 
LoggerFactory.getLogger(DeliveryCountTracker.class);
+    
+    /**
+     * Bidirectional Map of JMSMessageID with MessageTag.
+     */
+    private BidiMap _jmsIDtoDeliverTag = new TreeBidiMap();
+    
+    /**
+     * Map of JMSMessageIDs with count of deliveries.
+     */
+    private Map<String,Integer> _receivedMsgIDs;
+
+    private int _capacity;
+    
+    /**
+     * Creates a new DeliveryCountTracker instance.
+     * 
+     * @param capacity the number of records to track.
+     * @throws IllegalArgumentException if specified capacity not > 0
+     */
+    public DeliveryCountTracker(int capacity) throws IllegalArgumentException
+    {
+        if(capacity <= 0)
+        {
+            throw new IllegalArgumentException("Specified capacity must be 
greater than 0.");
+        }
+        _capacity  = capacity;
+        
+        /*
+         * HashMap Javadoc states: "If the initial capacity is greater than 
the maximum number
+         * of entries divided by the load factor, no rehash operations will 
ever occur."
+         * 
+         * Specifying an additional 5% size at construction with a 1.0 load 
factor to pre-allocate
+         * the entries, bound the max map size, and avoid size increases + 
associated rehashing.
+         */
+        int hashMapSize = (int)(_capacity * 1.05f);
+        
+        /*
+         *  Using the access-ordered LinkedHashMap variant to leverage the LRU 
based entry removal
+         *  behaviour provided when then overriding the removeEldestEntry 
method.
+         */
+        _receivedMsgIDs = new LinkedHashMap<String,Integer>(hashMapSize, 1.0f, 
true)
+        {
+            //Control the max size of the map using LRU based removal upon 
insertion.
+            protected boolean removeEldestEntry(Map.Entry<String,Integer> 
eldest)
+            {
+                boolean remove = size() > _capacity;
+                
+                // If the supplied entry is to be removed, also remove its 
associated
+                // delivery tag
+                if(remove)
+                {
+                    String msgId = eldest.getKey();
+                    
+                    if (_logger.isDebugEnabled())
+                    {
+                        _logger.debug("Removing delivery count records for 
message : " + msgId);
+                    }
+                    
+                    synchronized (DeliveryCountTracker.this)
+                    {
+                        //Also remove the message information from the 
deliveryTag map.
+                        if(msgId != null)
+                        {
+                            _jmsIDtoDeliverTag.remove(msgId);
+                        }
+                    }
+                }
+                
+                return remove;
+            }
+        };
+    }
+
+    /**
+     * Record sighting of a particular JMSMessageID, with the given 
deliveryTag.
+     *
+     * @param msgID the JMSMessageID of the message to track
+     * @param deliveryTag the delivery tag of the most recent encounter of the 
message
+     */
+    public synchronized void recordMessage(String msgID, long deliveryTag)
+    {
+        try
+        {
+            if(msgID == null)
+            {
+                //we can't distinguish between different
+                //messages without a JMSMessageID, so skip
+                return;
+            }
+
+            _jmsIDtoDeliverTag.put(msgID, deliveryTag);
+
+            Integer count = _receivedMsgIDs.get(msgID);
+
+            if(count != null)
+            {
+                ++count;
+                if (_logger.isDebugEnabled())
+                {
+                    _logger.debug("Incrementing count for JMSMessageID: '" + 
msgID + "', value now: " + count);
+                }
+                _receivedMsgIDs.put(msgID, count);
+            }
+            else
+            {
+                if (_logger.isDebugEnabled())
+                {
+                    _logger.debug("Recording first sighting of JMSMessageID '" 
+ msgID + "'");
+                }
+                _receivedMsgIDs.put(msgID, 1);
+            }
+        }
+        catch(Exception e)
+        {
+            _logger.warn("Exception recording delivery count for message: " + 
msgID, e);
+        }
+    }
+
+    /**
+     * Returns the number of times the message related to the given delivery 
tag has been seen
+     * 
+     * @param deliveryTag delivery tag of the message to retrieve the delivery 
count for
+     * @return the delivery count for that message, or 0 if there is no count 
known
+     */
+    public synchronized int getDeliveryCount(long deliveryTag)
+    {
+        String key = (String) _jmsIDtoDeliverTag.getKey(deliveryTag);
+
+        int count = 0;
+
+        if (key != null)
+        {
+            Integer val = _receivedMsgIDs.get(key);
+            if (val != null)
+            {
+                count = val;
+            }
+        }
+
+        return count; 
+    }
+
+    /**
+     * Removes both JMSMessageID and count related records associated with the 
given deliveryTag if any such records exist.
+     * @param deliveryTag the current tag of the message for which the 
JMSMessageID and count records should be removed
+     */
+    public synchronized void removeRecordsForMessage(long deliveryTag)
+    {
+        String msgId = (String) _jmsIDtoDeliverTag.removeValue(deliveryTag);
+
+        if (msgId != null)
+        {
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("Removed deliveryTag mapping for ID: '" + msgId 
+ "'");
+            }
+
+            Integer count = _receivedMsgIDs.remove(msgId);
+            if(count != null && _logger.isDebugEnabled())
+            {
+                _logger.debug("Removed count mapping for ID: '" + msgId + "' : 
" + count);
+            }
+        }
+    }
+
+    /**
+     * Removes both JMSMessageID and count related records associated with the 
given deliveryTags if any such records exist.
+     * @param deliveryTags the current tags of the messages for which the 
JMSMessageID and count records should be removed
+     */
+    public synchronized void removeRecordsForMessages(List<Long> deliveryTags)
+    {
+        if (deliveryTags == null)
+        {
+            return;
+        }
+        
+        for(long tag : deliveryTags)
+        {
+            removeRecordsForMessage(tag);
+        }
+    }
+}

Added: 
qpid/branches/0.5.x-dev/qpid/java/client/src/test/java/org/apache/qpid/client/DeliveryCountTrackerTest.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/client/src/test/java/org/apache/qpid/client/DeliveryCountTrackerTest.java?rev=1042997&view=auto
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/client/src/test/java/org/apache/qpid/client/DeliveryCountTrackerTest.java
 (added)
+++ 
qpid/branches/0.5.x-dev/qpid/java/client/src/test/java/org/apache/qpid/client/DeliveryCountTrackerTest.java
 Tue Dec  7 12:22:54 2010
@@ -0,0 +1,158 @@
+package org.apache.qpid.client;
+
+import java.util.ArrayList;
+
+import junit.framework.TestCase;
+
+public class DeliveryCountTrackerTest extends TestCase
+{
+    private DeliveryCountTracker _tracker;
+    private final int CAPACITY=50;
+
+    protected void setUp()
+    {
+        _tracker = new DeliveryCountTracker(CAPACITY);
+    }
+
+    /**
+     * Test the LRU based eviction policy of the tracker. Both the process of 
tracking new sightings of a given
+     * JMSMessageID and retrieving the existing count will involve accessing 
an existing record and making it 
+     * the most recently accessed. Commit/Rollback/Recover should remove any 
messages that can't be seen again
+     * due to consumption or rejection. Any other messages must be evicted by 
LRU policy as and when necessary
+     * to make way for new entries.
+     * 
+     * Test this by validating that upon tracking one more message than the 
capacity, the first message count 
+     * is lost. Then access the second message count and insert a further new 
message occurrence. Verify the 
+     * third message count is removed and not the second message count.
+     */
+    public void testLRUeviction()
+    {
+        long id;
+
+        for(id=1; id <= CAPACITY + 1; id ++)
+        {
+            _tracker.recordMessage(String.valueOf(id), id);
+        }
+        
+        assertEquals("Count was not as expected. First delivery tag " +
+                       "should have been evicted already:", 
_tracker.getDeliveryCount(1L), 0L);
+        
+        //Retrieve second delivery tag count, ensure it is not zero.
+        //This will also make it the most recently accessed record.
+        assertEquals("Count was not as expected.", 
_tracker.getDeliveryCount(2L), 1L);
+        
+        //Add a new record, check that tag 2 remains and tag 3 was evicted.
+        _tracker.recordMessage(String.valueOf(id), id);
+        assertEquals("Count was not as expected. Second delivery tag " +
+                "should NOT have been evicted already:", 
_tracker.getDeliveryCount(2L), 1L);
+        assertEquals("Count was not as expected. Third delivery tag " +
+                "should have been evicted already:", 
_tracker.getDeliveryCount(3L), 0L);
+    }
+
+    /**
+     * Test that once it is known a record can never be useful again it can be 
successfully removed
+     * from the records to allow room for new records without causing eviction 
of information that 
+     * could still be useful.
+     * 
+     * Fill the tracker with records, ensure the counts are correct and then 
delete them, and ensure the
+     * counts get reset.
+     */
+    public void testMessageRecordRemoval()
+    {
+        long id;
+
+        for(id=1 ; id <= CAPACITY; id ++)
+        {
+            _tracker.recordMessage(String.valueOf(id), id);
+        }
+
+        assertEquals("Count was not as expected.", 
_tracker.getDeliveryCount(1L), 1L);
+        assertEquals("Count was not as expected.", 
_tracker.getDeliveryCount(CAPACITY/2), 1L);
+        assertEquals("Count was not as expected.", 
_tracker.getDeliveryCount(CAPACITY-1), 1L);
+        
+        //remove records for first deliveryTag, ensure the others remain as 
expected
+        _tracker.removeRecordsForMessage(1L);
+        assertEquals("Count was not as expected.", 
_tracker.getDeliveryCount(1L), 0L);
+        assertEquals("Count was not as expected.", 
_tracker.getDeliveryCount(CAPACITY/2), 1L);
+        assertEquals("Count was not as expected.", 
_tracker.getDeliveryCount(CAPACITY-1), 1L);
+        
+        //remove records for next deliveryTag, ensure the others remain as 
expected
+        _tracker.removeRecordsForMessage(CAPACITY/2);
+        assertEquals("Count was not as expected.", 
_tracker.getDeliveryCount(1L), 0L);
+        assertEquals("Count was not as expected.", 
_tracker.getDeliveryCount(CAPACITY/2), 0L);
+        assertEquals("Count was not as expected.", 
_tracker.getDeliveryCount(CAPACITY-1), 1L);
+        
+        //remove records for next deliveryTag, ensure the others remain as 
expected
+        _tracker.removeRecordsForMessage(CAPACITY-1);
+        assertEquals("Count was not as expected.", 
_tracker.getDeliveryCount(1L), 0L);
+        assertEquals("Count was not as expected.", 
_tracker.getDeliveryCount(CAPACITY/2), 0L);
+        assertEquals("Count was not as expected.", 
_tracker.getDeliveryCount(CAPACITY-1), 0L);
+        
+        //ensure records for last deliveryTag is still as expected
+        assertEquals("Count was not as expected.", 
_tracker.getDeliveryCount(CAPACITY), 1L);
+    }
+    
+    /**
+     * Test that counts are accurately incremented and associated with the new 
deliveryTag when
+     * a message with the same JMSMessageID is encountered again, also 
ensuring that record of
+     * the count is no longer returned via the old deliveryTag
+     */
+    public void testCounting()
+    {
+        long id;
+
+        for(id=1 ; id <= CAPACITY; id ++)
+        {
+            _tracker.recordMessage(String.valueOf(id), id);
+        }
+        
+        //verify all counts are currently 1
+        ArrayList<Long> exclusions = new ArrayList<Long>();
+        verifyCounts(1,exclusions, CAPACITY);
+        
+        //Gather some of the existing JMSMessageIDs and create new 
deliveryTags for them, which can
+        //be used to represent receiving the same message again.
+        String msgId1 = String.valueOf(1L);
+        long newTag1 = id;
+        String msgId2 = String.valueOf(CAPACITY/2);
+        long newTag2 = ++id;
+        String msgId3 = String.valueOf(CAPACITY);
+        long newTag3 = ++id;
+        _tracker.recordMessage(String.valueOf(msgId1), newTag1);
+        _tracker.recordMessage(String.valueOf(msgId2), newTag2);
+        _tracker.recordMessage(String.valueOf(msgId3), newTag3);
+        
+        //Now check the updated values returned are as expected. 
+        
+        //entries for delivery tags with value 1,CAPACITY/2,CAPACITY should 
have just been removed 
+        //because new delivery tags associated with the same JMSMessageID were 
just recorded.
+        assertEquals("Count was not as expected.", 0L, 
_tracker.getDeliveryCount(1));
+        assertEquals("Count was not as expected.", 0L, 
_tracker.getDeliveryCount(CAPACITY/2));
+        assertEquals("Count was not as expected.", 0L, 
_tracker.getDeliveryCount(CAPACITY));
+        
+        //The count for the 'redelivered' messages with new deliveryTag should 
have increased. 
+        assertEquals("Count was not as expected.", 2L, 
_tracker.getDeliveryCount(newTag1));
+        assertEquals("Count was not as expected.", 2L, 
_tracker.getDeliveryCount(newTag2));
+        assertEquals("Count was not as expected.", 2L, 
_tracker.getDeliveryCount(newTag3));
+        
+        //all the other delivery tags should remain at 1:
+        exclusions.add(1L);
+        exclusions.add((long) CAPACITY/2);
+        exclusions.add((long) CAPACITY);
+        exclusions.add(newTag1);
+        exclusions.add(newTag2);
+        exclusions.add(newTag3);
+        verifyCounts(1,exclusions, CAPACITY+3);
+    }
+    
+    private void verifyCounts(long expectedValue, ArrayList<Long> 
excludeFromCheck, long numValues)
+    {
+        for(long id=1 ; id <= numValues; id ++)
+        {
+            if (!excludeFromCheck.contains(id))
+            {
+                assertEquals("Count was not as expected for id '" + id + "'.", 
1L, _tracker.getDeliveryCount(id));
+            }
+        }
+    }
+}



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to