Author: ritchiem
Date: Sat Apr 11 01:57:36 2009
New Revision: 764148

URL: http://svn.apache.org/viewvc?rev=764148&view=rev
Log:
QPID-1800: modify SAMQQ to record transactionlog etry even when queue isnt 
durable(consistent with restof broker) and send both current and new queue to
BTL for enqueue to ensure references are kept properly. Update BTL to check for 
prevous enqueues and record new enques in any existing list, despatching only 
new enqueues to the delegate

merged from trunk r764075

Modified:
    
qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
    
qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java

Modified: 
qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=764148&r1=764147&r2=764148&view=diff
==============================================================================
--- 
qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
 (original)
+++ 
qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
 Sat Apr 11 01:57:36 2009
@@ -823,6 +823,12 @@
 
         AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new 
AMQShortString(queueName));
         TransactionLog transactionLog = getVirtualHost().getTransactionLog();
+        
+        if (toQueue.equals(this))
+        {
+            //nothing to do here, message is already at the requested 
destination
+            return;
+        }
 
         List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
         {
@@ -848,19 +854,24 @@
             // Move the messages in the transaction log.
             for (QueueEntry entry : entries)
             {
-                if (entry.isPersistent() && toQueue.isDurable())
+                if (entry.isPersistent())
                 {
                     //FIXME
                     //fixme
-                    ArrayList list = new ArrayList();
+                    
+                    // Creating a list with the destination queue AND the 
current queue.
+                    // This is a hack to ensure a reference is kept in the 
TLog to the new destination when dequeing 
+                    // the old destination below, thus preventing incorrect 
removal of the message from the store
+                    ArrayList<AMQQueue> list = new ArrayList<AMQQueue>();
                     list.add(toQueue);
+                    list.add(this);
                     transactionLog.enqueueMessage(storeContext, list, 
entry.getMessageId());
                 }
                 // dequeue will remove the messages from the queue
                 entry.dequeue(storeContext);
             }
 
-            // Commit and flush the move transcations.
+            // Commit and flush the move transactions.
             try
             {
                 transactionLog.commitTran(storeContext);
@@ -891,7 +902,7 @@
                 toQueue.enqueue(storeContext, entry.getMessage());
                 // As we only did a dequeue above now that we have moved the 
message we should perform a delete.
                 // We cannot do this earlier as the message will be lost if 
flowed.
-                //entry.delete();
+                entry.delete();
             }
         }
         catch (MessageCleanupException e)
@@ -913,6 +924,12 @@
         AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new 
AMQShortString(queueName));
         TransactionLog transactionLog = getVirtualHost().getTransactionLog();
 
+        if (toQueue.equals(this))
+        {
+            //nothing to do here, message is already at the requested 
destination
+            return;
+        }
+        
         List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
         {
 
@@ -944,11 +961,15 @@
             // Move the messages in on the transaction log.
             for (QueueEntry entry : entries)
             {
-                if (!entry.isDeleted() && entry.isPersistent() && 
toQueue.isDurable())
+                if (!entry.isDeleted() && entry.isPersistent())
                 {
                     //fixme
                     //FIXME
+                    
+                    // Creating a list with the destination queue AND the 
current queue.
+                    // This is a hack to ensure a reference is kept in the 
TLog to the old destination when enqueing 
                     ArrayList list = new ArrayList();
+                    list.add(this);
                     list.add(toQueue);
                     transactionLog.enqueueMessage(storeContext, list, 
entry.getMessageId());
                 }

Modified: 
qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java?rev=764148&r1=764147&r2=764148&view=diff
==============================================================================
--- 
qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java
 (original)
+++ 
qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/transactionlog/BaseTransactionLog.java
 Sat Apr 11 01:57:36 2009
@@ -67,12 +67,47 @@
             {
                 _logger.info("Recording Enqueue of (" + messageId + ") on 
queue:" + queues);
             }
+            
+            //list to hold which new queues to enqueue the message on
+            ArrayList<AMQQueue> toEnqueueList = new ArrayList<AMQQueue>();
+            
+            List<AMQQueue> enqueuedList = _idToQueues.get(messageId);
+            if (enqueuedList != null)
+            {
+                //There are previous enqueues for this messageId
+                synchronized (enqueuedList)
+                {
+                    for(AMQQueue queue : queues)
+                    {
+                        if(!enqueuedList.contains(queue))
+                        {
+                            //update the old list.
+                            enqueuedList.add(queue);
+                            //keep track of new enqueues to be made
+                            toEnqueueList.add(queue);
+                        }
+                    }
+                }
+                
+                if(toEnqueueList.isEmpty())
+                {
+                    //no new queues to enqueue message on
+                    return;
+                }
+            }
+            else
+            {
+                //No existing list, add all provided queues (cloning 
toEnqueueList in case someone else changes original).
+                toEnqueueList.addAll(queues);
+                _idToQueues.put(messageId, 
Collections.synchronizedList((ArrayList<AMQQueue>)toEnqueueList.clone()));
+            }
 
-            //Clone the list incase someone else changes it.
-            _idToQueues.put(messageId, 
Collections.synchronizedList((ArrayList<AMQQueue>)queues.clone()));
+            _delegate.enqueueMessage(context, toEnqueueList, messageId);
+        }
+        else
+        {
+            _delegate.enqueueMessage(context, queues, messageId);
         }
-
-        _delegate.enqueueMessage(context, queues, messageId);
     }
 
     public void dequeueMessage(StoreContext context, AMQQueue queue, Long 
messageId) throws AMQException



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to