Author: rgodfrey
Date: Tue Jan  5 12:14:40 2010
New Revision: 896017

URL: http://svn.apache.org/viewvc?rev=896017&view=rev
Log:
QPID-2321 : Add conflation queues to the Java Broker

Added:
    
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java
    
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
Modified:
    
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
    
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
    
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
    
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
    
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
    
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
    
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
    
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
    
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
    
qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java
    
qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
    
qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java

Modified: 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java?rev=896017&r1=896016&r2=896017&view=diff
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
 (original)
+++ 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
 Tue Jan  5 12:14:40 2010
@@ -68,6 +68,11 @@
         return _config.getInt("priorities", -1);
     }
 
+    public String getConflationKey()
+    {
+        return _config.getString("conflationKey", null);
+    }
+
     public String getExchange()
     {
         return _config.getString("exchange", null);

Modified: 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java?rev=896017&r1=896016&r2=896017&view=diff
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
 (original)
+++ 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
 Tue Jan  5 12:14:40 2010
@@ -34,83 +34,83 @@
 
 public class VirtualHostConfiguration
 {
-       private Configuration _config;
-       private String _name;
-       private Map<String, QueueConfiguration> _queues = new HashMap<String, 
QueueConfiguration>();
-       private Map<String, ExchangeConfiguration> _exchanges = new 
HashMap<String, ExchangeConfiguration>();
-
-       public VirtualHostConfiguration(String name, Configuration config) 
throws ConfigurationException
-       {
-               _config = config;
-               _name = name;
-               Iterator i = _config.getList("queues.queue.name").iterator();
-               
-               while (i.hasNext())
-               {
-                       String queueName = (String) i.next();
-                       CompositeConfiguration mungedConf = new 
CompositeConfiguration();
-                       
mungedConf.addConfiguration(_config.subset("queues.queue." + queueName));
-                       mungedConf.addConfiguration(_config.subset("queues"));
-                       _queues.put(queueName, new 
QueueConfiguration(queueName, mungedConf, this));
+    private Configuration _config;
+    private String _name;
+    private Map<String, QueueConfiguration> _queues = new HashMap<String, 
QueueConfiguration>();
+    private Map<String, ExchangeConfiguration> _exchanges = new 
HashMap<String, ExchangeConfiguration>();
+
+    public VirtualHostConfiguration(String name, Configuration config) throws 
ConfigurationException
+    {
+        _config = config;
+        _name = name;
+        Iterator i = _config.getList("queues.queue.name").iterator();
+        
+        while (i.hasNext())
+        {
+            String queueName = (String) i.next();
+            CompositeConfiguration mungedConf = new CompositeConfiguration();
+            mungedConf.addConfiguration(_config.subset("queues.queue." + 
queueName));
+            mungedConf.addConfiguration(_config.subset("queues"));
+            _queues.put(queueName, new QueueConfiguration(queueName, 
mungedConf, this));
         }
 
-               i = _config.getList("exchanges.exchange.name").iterator();
-               int count = 0;
-               while (i.hasNext())
+        i = _config.getList("exchanges.exchange.name").iterator();
+        int count = 0;
+        while (i.hasNext())
         {
-                       CompositeConfiguration mungedConf = new 
CompositeConfiguration();
-                       
mungedConf.addConfiguration(config.subset("exchanges.exchange(" + count++ + 
")"));
-                       
mungedConf.addConfiguration(_config.subset("exchanges"));
-                       String exchName = (String) i.next();
-                       _exchanges.put(exchName, new 
ExchangeConfiguration(exchName, mungedConf));
+            CompositeConfiguration mungedConf = new CompositeConfiguration();
+            mungedConf.addConfiguration(config.subset("exchanges.exchange(" + 
count++ + ")"));
+            mungedConf.addConfiguration(_config.subset("exchanges"));
+            String exchName = (String) i.next();
+            _exchanges.put(exchName, new ExchangeConfiguration(exchName, 
mungedConf));
         }
 
     }
 
-       public String getName()
-       {
+    public String getName()
+    {
         return _name;
     }
 
-       public long getHousekeepingExpiredMessageCheckPeriod()
-       {
-               return 
_config.getLong("housekeeping.expiredMessageCheckPeriod", 
ApplicationRegistry.getInstance().getConfiguration().getHousekeepingCheckPeriod());
-       }
-
-       public String getAuthenticationDatabase()
-       {
-               return _config.getString("security.authentication.name");
-       }
-       
-       public List getCustomExchanges()
-       {
-               return _config.getList("custom-exchanges.class-name");
-       }
-       
-       public SecurityConfiguration getSecurityConfiguration()
-       {
-               return new SecurityConfiguration(_config.subset("security"));
-       }
-
-       public Configuration getStoreConfiguration()
-       {
-               return _config.subset("store");
-       }
-
-       public String getMessageStoreClass()
-       {
-               return _config.getString("store.class", 
MemoryMessageStore.class.getName());
-       }
-
-       public List getExchanges()
-       {
-               return _config.getList("exchanges.exchange.name");
-       }
-
-       public String[] getQueueNames()
-       {
-               return _queues.keySet().toArray(new String[_queues.size()]);
-       }
+    public long getHousekeepingExpiredMessageCheckPeriod()
+    {
+        return _config.getLong("housekeeping.expiredMessageCheckPeriod", 
ApplicationRegistry.getInstance().getConfiguration().getHousekeepingCheckPeriod());
+    }
+
+    public String getAuthenticationDatabase()
+    {
+        return _config.getString("security.authentication.name");
+    }
+    
+    public List getCustomExchanges()
+    {
+        return _config.getList("custom-exchanges.class-name");
+    }
+    
+    public SecurityConfiguration getSecurityConfiguration()
+    {
+        return new SecurityConfiguration(_config.subset("security"));
+    }
+
+    public Configuration getStoreConfiguration()
+    {
+        return _config.subset("store");
+    }
+
+    public String getMessageStoreClass()
+    {
+        return _config.getString("store.class", 
MemoryMessageStore.class.getName());
+    }
+
+    public List getExchanges()
+    {
+        return _config.getList("exchanges.exchange.name");
+    }
+
+    public String[] getQueueNames()
+    {
+        return _queues.keySet().toArray(new String[_queues.size()]);
+    }
 
     public ExchangeConfiguration getExchangeConfiguration(String exchangeName)
     {
@@ -166,7 +166,6 @@
         return _config.getLong("queues.minimumAlertRepeatGap", 0);
     }
 
-
     public long getCapacity()
     {
         return _config.getLong("queues.capacity", 0l);

Modified: 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java?rev=896017&r1=896016&r2=896017&view=diff
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
 (original)
+++ 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
 Tue Jan  5 12:14:40 2010
@@ -33,6 +33,7 @@
 public class AMQQueueFactory
 {
     public static final AMQShortString X_QPID_PRIORITIES = new 
AMQShortString("x-qpid-priorities");
+    public static final AMQShortString X_QPID_CONFLATION_KEY = new 
AMQShortString ("x-qpid-conflation-key");
 
     private abstract static class QueueProperty
     {
@@ -133,9 +134,14 @@
             throws AMQException
     {
         final int priorities = arguments == null ? 1 : 
arguments.containsKey(X_QPID_PRIORITIES) ? 
arguments.getInteger(X_QPID_PRIORITIES) : 1;
+        final String conflationKey = arguments == null ? null : 
arguments.containsKey(X_QPID_CONFLATION_KEY) ? 
arguments.getString(X_QPID_CONFLATION_KEY) : null;
 
         AMQQueue q = null;
-        if(priorities > 1)
+        if(conflationKey != null)
+        {
+            q = new ConflationQueue(name, durable, owner, autoDelete, 
virtualHost, new AMQShortString(conflationKey));
+        }
+        else if(priorities > 1)
         {
             q = new AMQPriorityQueue(name, durable, owner, autoDelete, 
virtualHost, priorities);
         }
@@ -184,6 +190,15 @@
             }
             arguments.put(new AMQShortString("x-qpid-priorities"), priorities);
         }
+        String conflationKey = config.getConflationKey();
+        if(conflationKey != null)
+        {
+            if(arguments == null)
+            {
+                arguments = new FieldTable();
+            }
+            arguments.put(new AMQShortString("x-qpid-conflation-key"), 
conflationKey);
+        }
 
         AMQQueue q = createAMQQueueImpl(queueName, durable, owner, autodelete, 
host, arguments);
         q.configure(config);

Added: 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java?rev=896017&view=auto
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java
 (added)
+++ 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java
 Tue Jan  5 12:14:40 2010
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.AMQException;
+
+public class ConflationQueue extends SimpleAMQQueue
+{
+    protected ConflationQueue(AMQShortString name,
+                              boolean durable,
+                              AMQShortString owner,
+                              boolean autoDelete,
+                              VirtualHost virtualHost,
+                              AMQShortString conflationKey)
+            throws AMQException
+    {
+        super(name, durable, owner, autoDelete, virtualHost, new 
ConflationQueueList.Factory(conflationKey));
+    }
+
+
+}

Added: 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java?rev=896017&view=auto
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
 (added)
+++ 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
 Tue Jan  5 12:14:40 2010
@@ -0,0 +1,182 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.AMQChannel;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class ConflationQueueList extends SimpleQueueEntryList
+{
+
+    private final AMQShortString _conflationKey;
+    private final ConcurrentHashMap<Object, AtomicReference<QueueEntry>> 
_latestValuesMap =
+        new ConcurrentHashMap<Object, AtomicReference<QueueEntry>>();
+
+    public ConflationQueueList(AMQQueue queue, AMQShortString conflationKey)
+    {
+        super(queue);
+        _conflationKey = conflationKey;
+    }
+
+    @Override
+    protected ConflationQueueEntry createQueueEntry(AMQMessage message)
+    {
+        return new ConflationQueueEntry(this, message);
+    }
+
+
+    @Override
+    public QueueEntry add(final AMQMessage message, final StoreContext 
storeContext)
+    {
+        ConflationQueueEntry entry = (ConflationQueueEntry) 
(super.add(message, storeContext));
+        AtomicReference<QueueEntry> latestValueReference = null;
+
+        try
+        {
+            Object value = 
((BasicContentHeaderProperties)message.getContentHeaderBody().properties).getHeaders().get(_conflationKey);
            
+            if(value != null)
+            {
+                latestValueReference = _latestValuesMap.get(value);
+                if(latestValueReference == null)
+                {
+                    _latestValuesMap.putIfAbsent(value, new 
AtomicReference<QueueEntry>(entry));
+                    latestValueReference = _latestValuesMap.get(value);
+                }
+                QueueEntry oldEntry;
+
+                do
+                {
+                    oldEntry = latestValueReference.get();
+                }
+                while(oldEntry.compareTo(entry) < 0 && 
!latestValueReference.compareAndSet(oldEntry, entry));
+
+                if(oldEntry.compareTo(entry) < 0)
+                {
+                    // We replaced some other entry to become the newest value
+                    if(oldEntry.acquire())
+                    {
+                        oldEntry.discard(storeContext);
+                    }
+                }
+                else if (oldEntry.compareTo(entry) > 0)
+                {
+                    // A newer entry came along
+                    if(entry.acquire())
+                    {
+                        entry.discard(storeContext);
+                    }
+                }
+            }
+        }
+        catch (AMQException e)
+        {
+
+            throw new RuntimeException(e);
+        }
+
+        entry.setLatestValueReference(latestValueReference);
+        return entry;
+    }
+
+    private final class ConflationQueueEntry extends QueueEntryImpl
+    {
+
+
+        private AtomicReference<QueueEntry> _latestValueReference;
+
+        public ConflationQueueEntry(SimpleQueueEntryList queueEntryList, 
AMQMessage message)
+        {
+            super(queueEntryList, message);
+        }
+
+
+        public void release()
+        {
+            Subscription sub = getDeliveredSubscription();
+
+            StoreContext storeContext = null;
+            if(sub != null)
+            {
+                AMQChannel channel = sub.getChannel();
+                if(channel != null)
+                {
+                    storeContext = channel.getStoreContext();
+                }
+            }
+
+
+            super.release();
+
+            if(_latestValueReference != null)
+            {
+                if((_latestValueReference.get() != this) && acquire())
+                {
+                    if(storeContext == null)
+                    {
+                        storeContext = new StoreContext();
+                    }
+                    try
+                    {
+                        discard(storeContext);
+                    }
+                    catch (FailedDequeueException e)
+                    {
+                        throw new RuntimeException(e);
+                    }
+                    catch (MessageCleanupException e)
+                    {
+                        throw new RuntimeException(e);
+                    }
+                }
+            }
+
+        }
+
+        public void setLatestValueReference(final AtomicReference<QueueEntry> 
latestValueReference)
+        {
+            _latestValueReference = latestValueReference;
+        }
+    }
+
+    static class Factory implements QueueEntryListFactory
+    {
+        private final AMQShortString _conflationKey;
+
+        Factory(AMQShortString conflationKey)
+        {
+            _conflationKey = conflationKey;
+        }
+
+        public QueueEntryList createQueueEntryList(AMQQueue queue)
+        {
+            return new ConflationQueueList(queue, _conflationKey);
+        }
+    }
+
+}

Modified: 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java?rev=896017&r1=896016&r2=896017&view=diff
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
 (original)
+++ 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
 Tue Jan  5 12:14:40 2010
@@ -22,6 +22,7 @@
 
 import org.apache.qpid.framing.CommonContentHeaderProperties;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.server.store.StoreContext;
 
 public class PriorityQueueList implements QueueEntryList
 {
@@ -52,7 +53,7 @@
         return _queue;
     }
 
-    public QueueEntry add(AMQMessage message)
+    public QueueEntry add(AMQMessage message, final StoreContext storeContext)
     {
         try
         {
@@ -65,7 +66,7 @@
             {
                 index = 0;
             }
-            return _priorityLists[index].add(message);
+            return _priorityLists[index].add(message, storeContext);
         }
         catch (AMQException e)
         {

Modified: 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=896017&r1=896016&r2=896017&view=diff
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
 (original)
+++ 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
 Tue Jan  5 12:14:40 2010
@@ -143,6 +143,8 @@
 
     boolean isAcquired();
 
+    boolean isAvailable();
+
     boolean acquire();
     boolean acquire(Subscription sub);
 

Modified: 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=896017&r1=896016&r2=896017&view=diff
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
 (original)
+++ 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
 Tue Jan  5 12:14:40 2010
@@ -137,6 +137,13 @@
         return _state.getState() == State.ACQUIRED;
     }
 
+
+    public boolean isAvailable()
+    {
+        return _state.getState() == State.AVAILABLE;
+    }
+
+
     public boolean acquire()
     {
         return acquire(NON_SUBSCRIPTION_ACQUIRED_STATE);

Modified: 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java?rev=896017&r1=896016&r2=896017&view=diff
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
 (original)
+++ 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
 Tue Jan  5 12:14:40 2010
@@ -20,11 +20,13 @@
 */
 package org.apache.qpid.server.queue;
 
+import org.apache.qpid.server.store.StoreContext;
+
 public interface QueueEntryList
 {
     AMQQueue getQueue();
 
-    QueueEntry add(AMQMessage message);
+    QueueEntry add(AMQMessage message, final StoreContext storeContext);
 
     QueueEntry next(QueueEntry node);
 

Modified: 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=896017&r1=896016&r2=896017&view=diff
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
 (original)
+++ 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
 Tue Jan  5 12:14:40 2010
@@ -375,7 +375,7 @@
 
             try
             {
-                entry = _entries.add(message);
+                entry = _entries.add(message, storeContext);
 
                 deliverToSubscription(exclusiveSub, entry);
 
@@ -394,7 +394,7 @@
         }
         else
         {
-            entry = _entries.add(message);
+            entry = _entries.add(message, storeContext);
             /*
 
             iterate over subscriptions and if any is at the end of the queue 
and can deliver this message, then deliver the message
@@ -589,7 +589,7 @@
 
         SubscriptionList.SubscriptionNodeIterator subscriberIter = 
_subscriptionList.iterator();
         // iterate over all the subscribers, and if they are in advance of 
this queue entry then move them backwards
-        while (subscriberIter.advance())
+        while (subscriberIter.advance() && entry.isAvailable())
         {
             Subscription sub = subscriberIter.getNode().getSubscription();
 

Modified: 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java?rev=896017&r1=896016&r2=896017&view=diff
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
 (original)
+++ 
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
 Tue Jan  5 12:14:40 2010
@@ -1,5 +1,7 @@
 package org.apache.qpid.server.queue;
 
+import org.apache.qpid.server.store.StoreContext;
+
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 /*
@@ -74,9 +76,9 @@
     }
 
 
-    public QueueEntry add(AMQMessage message)
+    public QueueEntry add(AMQMessage message, final StoreContext storeContext)
     {
-        QueueEntryImpl node = new QueueEntryImpl(this, message);
+        QueueEntryImpl node = createQueueEntry(message);
         for (;;)
         {
             QueueEntryImpl tail = _tail;
@@ -101,6 +103,11 @@
         }
     }
 
+    protected QueueEntryImpl createQueueEntry(AMQMessage message) 
+    {
+        return new QueueEntryImpl(this, message);
+    }
+
     public QueueEntry next(QueueEntry node)
     {
         return ((QueueEntryImpl)node).getNext();

Modified: 
qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java?rev=896017&r1=896016&r2=896017&view=diff
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java
 (original)
+++ 
qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java
 Tue Jan  5 12:14:40 2010
@@ -22,7 +22,6 @@
 
 import junit.framework.TestCase;
 import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl;
-import org.apache.qpid.server.queue.MockQueueEntry;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.queue.SimpleQueueEntryList;
 import org.apache.qpid.server.queue.MockAMQMessage;
@@ -38,7 +37,6 @@
 import java.util.Map;
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
-import java.util.Iterator;
 
 /**
  * QPID-1385 : Race condition between added to unacked map and resending due 
to a rollback.
@@ -78,7 +76,7 @@
         {
             AMQMessage msg = new MockAMQMessage(id);
 
-            list.add(msg);
+            list.add(msg, new StoreContext());
 
             //Increment ID;
             id++;

Modified: 
qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java?rev=896017&r1=896016&r2=896017&view=diff
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
 (original)
+++ 
qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
 Tue Jan  5 12:14:40 2010
@@ -292,6 +292,11 @@
                     return false;  //To change body of implemented methods use 
File | Settings | File Templates.
                 }
 
+                public boolean isAvailable()
+                {
+                    return false;  //To change body of implemented methods use 
File | Settings | File Templates.
+                }
+
                 public boolean acquire()
                 {
                     return false;  //To change body of implemented methods use 
File | Settings | File Templates.

Modified: 
qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java?rev=896017&r1=896016&r2=896017&view=diff
==============================================================================
--- 
qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
 (original)
+++ 
qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
 Tue Jan  5 12:14:40 2010
@@ -114,6 +114,11 @@
         return false;
     }
 
+    public boolean isAvailable()
+    {
+        return false;
+    }
+
     public boolean isDeleted()
     {
         return false;



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to