Author: rgodfrey
Date: Tue Jan 5 12:14:40 2010
New Revision: 896017
URL: http://svn.apache.org/viewvc?rev=896017&view=rev
Log:
QPID-2321 : Add conflation queues to the Java Broker
Added:
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
Modified:
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java
qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
Modified:
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
URL:
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java?rev=896017&r1=896016&r2=896017&view=diff
==============================================================================
---
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
(original)
+++
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
Tue Jan 5 12:14:40 2010
@@ -68,6 +68,11 @@
return _config.getInt("priorities", -1);
}
+ public String getConflationKey()
+ {
+ return _config.getString("conflationKey", null);
+ }
+
public String getExchange()
{
return _config.getString("exchange", null);
Modified:
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
URL:
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java?rev=896017&r1=896016&r2=896017&view=diff
==============================================================================
---
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
(original)
+++
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
Tue Jan 5 12:14:40 2010
@@ -34,83 +34,83 @@
public class VirtualHostConfiguration
{
- private Configuration _config;
- private String _name;
- private Map<String, QueueConfiguration> _queues = new HashMap<String,
QueueConfiguration>();
- private Map<String, ExchangeConfiguration> _exchanges = new
HashMap<String, ExchangeConfiguration>();
-
- public VirtualHostConfiguration(String name, Configuration config)
throws ConfigurationException
- {
- _config = config;
- _name = name;
- Iterator i = _config.getList("queues.queue.name").iterator();
-
- while (i.hasNext())
- {
- String queueName = (String) i.next();
- CompositeConfiguration mungedConf = new
CompositeConfiguration();
-
mungedConf.addConfiguration(_config.subset("queues.queue." + queueName));
- mungedConf.addConfiguration(_config.subset("queues"));
- _queues.put(queueName, new
QueueConfiguration(queueName, mungedConf, this));
+ private Configuration _config;
+ private String _name;
+ private Map<String, QueueConfiguration> _queues = new HashMap<String,
QueueConfiguration>();
+ private Map<String, ExchangeConfiguration> _exchanges = new
HashMap<String, ExchangeConfiguration>();
+
+ public VirtualHostConfiguration(String name, Configuration config) throws
ConfigurationException
+ {
+ _config = config;
+ _name = name;
+ Iterator i = _config.getList("queues.queue.name").iterator();
+
+ while (i.hasNext())
+ {
+ String queueName = (String) i.next();
+ CompositeConfiguration mungedConf = new CompositeConfiguration();
+ mungedConf.addConfiguration(_config.subset("queues.queue." +
queueName));
+ mungedConf.addConfiguration(_config.subset("queues"));
+ _queues.put(queueName, new QueueConfiguration(queueName,
mungedConf, this));
}
- i = _config.getList("exchanges.exchange.name").iterator();
- int count = 0;
- while (i.hasNext())
+ i = _config.getList("exchanges.exchange.name").iterator();
+ int count = 0;
+ while (i.hasNext())
{
- CompositeConfiguration mungedConf = new
CompositeConfiguration();
-
mungedConf.addConfiguration(config.subset("exchanges.exchange(" + count++ +
")"));
-
mungedConf.addConfiguration(_config.subset("exchanges"));
- String exchName = (String) i.next();
- _exchanges.put(exchName, new
ExchangeConfiguration(exchName, mungedConf));
+ CompositeConfiguration mungedConf = new CompositeConfiguration();
+ mungedConf.addConfiguration(config.subset("exchanges.exchange(" +
count++ + ")"));
+ mungedConf.addConfiguration(_config.subset("exchanges"));
+ String exchName = (String) i.next();
+ _exchanges.put(exchName, new ExchangeConfiguration(exchName,
mungedConf));
}
}
- public String getName()
- {
+ public String getName()
+ {
return _name;
}
- public long getHousekeepingExpiredMessageCheckPeriod()
- {
- return
_config.getLong("housekeeping.expiredMessageCheckPeriod",
ApplicationRegistry.getInstance().getConfiguration().getHousekeepingCheckPeriod());
- }
-
- public String getAuthenticationDatabase()
- {
- return _config.getString("security.authentication.name");
- }
-
- public List getCustomExchanges()
- {
- return _config.getList("custom-exchanges.class-name");
- }
-
- public SecurityConfiguration getSecurityConfiguration()
- {
- return new SecurityConfiguration(_config.subset("security"));
- }
-
- public Configuration getStoreConfiguration()
- {
- return _config.subset("store");
- }
-
- public String getMessageStoreClass()
- {
- return _config.getString("store.class",
MemoryMessageStore.class.getName());
- }
-
- public List getExchanges()
- {
- return _config.getList("exchanges.exchange.name");
- }
-
- public String[] getQueueNames()
- {
- return _queues.keySet().toArray(new String[_queues.size()]);
- }
+ public long getHousekeepingExpiredMessageCheckPeriod()
+ {
+ return _config.getLong("housekeeping.expiredMessageCheckPeriod",
ApplicationRegistry.getInstance().getConfiguration().getHousekeepingCheckPeriod());
+ }
+
+ public String getAuthenticationDatabase()
+ {
+ return _config.getString("security.authentication.name");
+ }
+
+ public List getCustomExchanges()
+ {
+ return _config.getList("custom-exchanges.class-name");
+ }
+
+ public SecurityConfiguration getSecurityConfiguration()
+ {
+ return new SecurityConfiguration(_config.subset("security"));
+ }
+
+ public Configuration getStoreConfiguration()
+ {
+ return _config.subset("store");
+ }
+
+ public String getMessageStoreClass()
+ {
+ return _config.getString("store.class",
MemoryMessageStore.class.getName());
+ }
+
+ public List getExchanges()
+ {
+ return _config.getList("exchanges.exchange.name");
+ }
+
+ public String[] getQueueNames()
+ {
+ return _queues.keySet().toArray(new String[_queues.size()]);
+ }
public ExchangeConfiguration getExchangeConfiguration(String exchangeName)
{
@@ -166,7 +166,6 @@
return _config.getLong("queues.minimumAlertRepeatGap", 0);
}
-
public long getCapacity()
{
return _config.getLong("queues.capacity", 0l);
Modified:
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
URL:
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java?rev=896017&r1=896016&r2=896017&view=diff
==============================================================================
---
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
(original)
+++
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
Tue Jan 5 12:14:40 2010
@@ -33,6 +33,7 @@
public class AMQQueueFactory
{
public static final AMQShortString X_QPID_PRIORITIES = new
AMQShortString("x-qpid-priorities");
+ public static final AMQShortString X_QPID_CONFLATION_KEY = new
AMQShortString ("x-qpid-conflation-key");
private abstract static class QueueProperty
{
@@ -133,9 +134,14 @@
throws AMQException
{
final int priorities = arguments == null ? 1 :
arguments.containsKey(X_QPID_PRIORITIES) ?
arguments.getInteger(X_QPID_PRIORITIES) : 1;
+ final String conflationKey = arguments == null ? null :
arguments.containsKey(X_QPID_CONFLATION_KEY) ?
arguments.getString(X_QPID_CONFLATION_KEY) : null;
AMQQueue q = null;
- if(priorities > 1)
+ if(conflationKey != null)
+ {
+ q = new ConflationQueue(name, durable, owner, autoDelete,
virtualHost, new AMQShortString(conflationKey));
+ }
+ else if(priorities > 1)
{
q = new AMQPriorityQueue(name, durable, owner, autoDelete,
virtualHost, priorities);
}
@@ -184,6 +190,15 @@
}
arguments.put(new AMQShortString("x-qpid-priorities"), priorities);
}
+ String conflationKey = config.getConflationKey();
+ if(conflationKey != null)
+ {
+ if(arguments == null)
+ {
+ arguments = new FieldTable();
+ }
+ arguments.put(new AMQShortString("x-qpid-conflation-key"),
conflationKey);
+ }
AMQQueue q = createAMQQueueImpl(queueName, durable, owner, autodelete,
host, arguments);
q.configure(config);
Added:
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java
URL:
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java?rev=896017&view=auto
==============================================================================
---
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java
(added)
+++
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java
Tue Jan 5 12:14:40 2010
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.AMQException;
+
+public class ConflationQueue extends SimpleAMQQueue
+{
+ protected ConflationQueue(AMQShortString name,
+ boolean durable,
+ AMQShortString owner,
+ boolean autoDelete,
+ VirtualHost virtualHost,
+ AMQShortString conflationKey)
+ throws AMQException
+ {
+ super(name, durable, owner, autoDelete, virtualHost, new
ConflationQueueList.Factory(conflationKey));
+ }
+
+
+}
Added:
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
URL:
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java?rev=896017&view=auto
==============================================================================
---
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
(added)
+++
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
Tue Jan 5 12:14:40 2010
@@ -0,0 +1,182 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.AMQChannel;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class ConflationQueueList extends SimpleQueueEntryList
+{
+
+ private final AMQShortString _conflationKey;
+ private final ConcurrentHashMap<Object, AtomicReference<QueueEntry>>
_latestValuesMap =
+ new ConcurrentHashMap<Object, AtomicReference<QueueEntry>>();
+
+ public ConflationQueueList(AMQQueue queue, AMQShortString conflationKey)
+ {
+ super(queue);
+ _conflationKey = conflationKey;
+ }
+
+ @Override
+ protected ConflationQueueEntry createQueueEntry(AMQMessage message)
+ {
+ return new ConflationQueueEntry(this, message);
+ }
+
+
+ @Override
+ public QueueEntry add(final AMQMessage message, final StoreContext
storeContext)
+ {
+ ConflationQueueEntry entry = (ConflationQueueEntry)
(super.add(message, storeContext));
+ AtomicReference<QueueEntry> latestValueReference = null;
+
+ try
+ {
+ Object value =
((BasicContentHeaderProperties)message.getContentHeaderBody().properties).getHeaders().get(_conflationKey);
+ if(value != null)
+ {
+ latestValueReference = _latestValuesMap.get(value);
+ if(latestValueReference == null)
+ {
+ _latestValuesMap.putIfAbsent(value, new
AtomicReference<QueueEntry>(entry));
+ latestValueReference = _latestValuesMap.get(value);
+ }
+ QueueEntry oldEntry;
+
+ do
+ {
+ oldEntry = latestValueReference.get();
+ }
+ while(oldEntry.compareTo(entry) < 0 &&
!latestValueReference.compareAndSet(oldEntry, entry));
+
+ if(oldEntry.compareTo(entry) < 0)
+ {
+ // We replaced some other entry to become the newest value
+ if(oldEntry.acquire())
+ {
+ oldEntry.discard(storeContext);
+ }
+ }
+ else if (oldEntry.compareTo(entry) > 0)
+ {
+ // A newer entry came along
+ if(entry.acquire())
+ {
+ entry.discard(storeContext);
+ }
+ }
+ }
+ }
+ catch (AMQException e)
+ {
+
+ throw new RuntimeException(e);
+ }
+
+ entry.setLatestValueReference(latestValueReference);
+ return entry;
+ }
+
+ private final class ConflationQueueEntry extends QueueEntryImpl
+ {
+
+
+ private AtomicReference<QueueEntry> _latestValueReference;
+
+ public ConflationQueueEntry(SimpleQueueEntryList queueEntryList,
AMQMessage message)
+ {
+ super(queueEntryList, message);
+ }
+
+
+ public void release()
+ {
+ Subscription sub = getDeliveredSubscription();
+
+ StoreContext storeContext = null;
+ if(sub != null)
+ {
+ AMQChannel channel = sub.getChannel();
+ if(channel != null)
+ {
+ storeContext = channel.getStoreContext();
+ }
+ }
+
+
+ super.release();
+
+ if(_latestValueReference != null)
+ {
+ if((_latestValueReference.get() != this) && acquire())
+ {
+ if(storeContext == null)
+ {
+ storeContext = new StoreContext();
+ }
+ try
+ {
+ discard(storeContext);
+ }
+ catch (FailedDequeueException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (MessageCleanupException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ }
+
+ public void setLatestValueReference(final AtomicReference<QueueEntry>
latestValueReference)
+ {
+ _latestValueReference = latestValueReference;
+ }
+ }
+
+ static class Factory implements QueueEntryListFactory
+ {
+ private final AMQShortString _conflationKey;
+
+ Factory(AMQShortString conflationKey)
+ {
+ _conflationKey = conflationKey;
+ }
+
+ public QueueEntryList createQueueEntryList(AMQQueue queue)
+ {
+ return new ConflationQueueList(queue, _conflationKey);
+ }
+ }
+
+}
Modified:
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
URL:
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java?rev=896017&r1=896016&r2=896017&view=diff
==============================================================================
---
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
(original)
+++
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
Tue Jan 5 12:14:40 2010
@@ -22,6 +22,7 @@
import org.apache.qpid.framing.CommonContentHeaderProperties;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.store.StoreContext;
public class PriorityQueueList implements QueueEntryList
{
@@ -52,7 +53,7 @@
return _queue;
}
- public QueueEntry add(AMQMessage message)
+ public QueueEntry add(AMQMessage message, final StoreContext storeContext)
{
try
{
@@ -65,7 +66,7 @@
{
index = 0;
}
- return _priorityLists[index].add(message);
+ return _priorityLists[index].add(message, storeContext);
}
catch (AMQException e)
{
Modified:
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL:
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=896017&r1=896016&r2=896017&view=diff
==============================================================================
---
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
(original)
+++
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
Tue Jan 5 12:14:40 2010
@@ -143,6 +143,8 @@
boolean isAcquired();
+ boolean isAvailable();
+
boolean acquire();
boolean acquire(Subscription sub);
Modified:
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL:
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=896017&r1=896016&r2=896017&view=diff
==============================================================================
---
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
(original)
+++
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
Tue Jan 5 12:14:40 2010
@@ -137,6 +137,13 @@
return _state.getState() == State.ACQUIRED;
}
+
+ public boolean isAvailable()
+ {
+ return _state.getState() == State.AVAILABLE;
+ }
+
+
public boolean acquire()
{
return acquire(NON_SUBSCRIPTION_ACQUIRED_STATE);
Modified:
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
URL:
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java?rev=896017&r1=896016&r2=896017&view=diff
==============================================================================
---
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
(original)
+++
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
Tue Jan 5 12:14:40 2010
@@ -20,11 +20,13 @@
*/
package org.apache.qpid.server.queue;
+import org.apache.qpid.server.store.StoreContext;
+
public interface QueueEntryList
{
AMQQueue getQueue();
- QueueEntry add(AMQMessage message);
+ QueueEntry add(AMQMessage message, final StoreContext storeContext);
QueueEntry next(QueueEntry node);
Modified:
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL:
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=896017&r1=896016&r2=896017&view=diff
==============================================================================
---
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
(original)
+++
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
Tue Jan 5 12:14:40 2010
@@ -375,7 +375,7 @@
try
{
- entry = _entries.add(message);
+ entry = _entries.add(message, storeContext);
deliverToSubscription(exclusiveSub, entry);
@@ -394,7 +394,7 @@
}
else
{
- entry = _entries.add(message);
+ entry = _entries.add(message, storeContext);
/*
iterate over subscriptions and if any is at the end of the queue
and can deliver this message, then deliver the message
@@ -589,7 +589,7 @@
SubscriptionList.SubscriptionNodeIterator subscriberIter =
_subscriptionList.iterator();
// iterate over all the subscribers, and if they are in advance of
this queue entry then move them backwards
- while (subscriberIter.advance())
+ while (subscriberIter.advance() && entry.isAvailable())
{
Subscription sub = subscriberIter.getNode().getSubscription();
Modified:
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
URL:
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java?rev=896017&r1=896016&r2=896017&view=diff
==============================================================================
---
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
(original)
+++
qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
Tue Jan 5 12:14:40 2010
@@ -1,5 +1,7 @@
package org.apache.qpid.server.queue;
+import org.apache.qpid.server.store.StoreContext;
+
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
/*
@@ -74,9 +76,9 @@
}
- public QueueEntry add(AMQMessage message)
+ public QueueEntry add(AMQMessage message, final StoreContext storeContext)
{
- QueueEntryImpl node = new QueueEntryImpl(this, message);
+ QueueEntryImpl node = createQueueEntry(message);
for (;;)
{
QueueEntryImpl tail = _tail;
@@ -101,6 +103,11 @@
}
}
+ protected QueueEntryImpl createQueueEntry(AMQMessage message)
+ {
+ return new QueueEntryImpl(this, message);
+ }
+
public QueueEntry next(QueueEntry node)
{
return ((QueueEntryImpl)node).getNext();
Modified:
qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java
URL:
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java?rev=896017&r1=896016&r2=896017&view=diff
==============================================================================
---
qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java
(original)
+++
qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java
Tue Jan 5 12:14:40 2010
@@ -22,7 +22,6 @@
import junit.framework.TestCase;
import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl;
-import org.apache.qpid.server.queue.MockQueueEntry;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.queue.SimpleQueueEntryList;
import org.apache.qpid.server.queue.MockAMQMessage;
@@ -38,7 +37,6 @@
import java.util.Map;
import java.util.LinkedHashMap;
import java.util.LinkedList;
-import java.util.Iterator;
/**
* QPID-1385 : Race condition between added to unacked map and resending due
to a rollback.
@@ -78,7 +76,7 @@
{
AMQMessage msg = new MockAMQMessage(id);
- list.add(msg);
+ list.add(msg, new StoreContext());
//Increment ID;
id++;
Modified:
qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
URL:
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java?rev=896017&r1=896016&r2=896017&view=diff
==============================================================================
---
qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
(original)
+++
qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
Tue Jan 5 12:14:40 2010
@@ -292,6 +292,11 @@
return false; //To change body of implemented methods use
File | Settings | File Templates.
}
+ public boolean isAvailable()
+ {
+ return false; //To change body of implemented methods use
File | Settings | File Templates.
+ }
+
public boolean acquire()
{
return false; //To change body of implemented methods use
File | Settings | File Templates.
Modified:
qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
URL:
http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java?rev=896017&r1=896016&r2=896017&view=diff
==============================================================================
---
qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
(original)
+++
qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
Tue Jan 5 12:14:40 2010
@@ -114,6 +114,11 @@
return false;
}
+ public boolean isAvailable()
+ {
+ return false;
+ }
+
public boolean isDeleted()
{
return false;
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]