Author: ritchiem
Date: Wed Mar 25 18:39:25 2009
New Revision: 758397
URL: http://svn.apache.org/viewvc?rev=758397&view=rev
Log:
QPID-1735 : Added Documentation to QueueBackingStore around thread safety of
load/unload, Updated FQBS to adhere to the thread safety specified by the
interface. QueueEntry was updated to return the AMQMessage from the load() to
complete the getMessage() interface. As in a flowed state the message may be
purged before a reference can be taken. Added new Test
QueueEntryImplThreadingTest that should later be run for longer but aims to
show that load always returns the message even when unloads are occuring
asynchronously.
Commit from 0.5-release : r758388
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStore.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStore.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/filter/PropertyExpressionTest.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStore.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStore.java?rev=758397&r1=758396&r2=758397&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStore.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStore.java
Wed Mar 25 18:39:25 2009
@@ -157,10 +157,10 @@
{
try
{
- input.close();
- // We can purge the message here then reflow it if required
but I believe it to be cleaner to leave it
- // on disk until it has been deleted from the queue at that
point we can be sure we won't need the data
- //handle.delete();
+ if (input != null)
+ {
+ input.close();
+ }
}
catch (IOException e)
{
@@ -171,101 +171,123 @@
throw new UnableToRecoverMessageException(error);
}
+ /**
+ * Thread safety is ensured here by synchronizing on the message object.
+ *
+ * This is safe as load() calls will fail until the first thread through
here has created the file on disk
+ * and fully written the content.
+ *
+ * After this point new AMQMessages can exist that reference the same data
thus breaking the synchronisation.
+ *
+ * Thread safety is maintained here as the existence of the file is
checked allowing then subsequent unload() calls
+ * to skip the writing.
+ *
+ * Multiple unload() calls will initially be blocked using the
synchronization until the data exists on disk thus
+ * safely allowing any reference to the message to be cleared prompting a
load call.
+ *
+ * @param message the message to unload
+ * @throws UnableToFlowMessageException
+ */
public void unload(AMQMessage message) throws UnableToFlowMessageException
{
- long messageId = message.getMessageId();
+ //Synchorize on the message to ensure that one only thread can unload
at a time.
+ // If a second unload is attempted then it will block until the unload
has completed.
+ synchronized (message)
+ {
+ long messageId = message.getMessageId();
- File handle = getFileHandle(messageId);
+ File handle = getFileHandle(messageId);
- //If we have written the data once then we don't need to do it again.
- if (handle.exists())
- {
- if (_log.isDebugEnabled())
+ //If we have written the data once then we don't need to do it
again.
+ if (handle.exists())
{
- _log.debug("Message(ID:" + messageId + ") already unloaded.");
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Message(ID:" + messageId + ") already
unloaded.");
+ }
+ return;
}
- return;
- }
- if (_log.isInfoEnabled())
- {
- _log.info("Unloading Message (ID:" + messageId + ")");
- }
-
- ObjectOutputStream writer = null;
- Exception error = null;
+ if (_log.isInfoEnabled())
+ {
+ _log.info("Unloading Message (ID:" + messageId + ")");
+ }
- try
- {
- writer = new ObjectOutputStream(new FileOutputStream(handle));
+ ObjectOutputStream writer = null;
+ Exception error = null;
- writer.writeLong(message.getArrivalTime());
+ try
+ {
+ writer = new ObjectOutputStream(new FileOutputStream(handle));
- MessagePublishInfo mpi = message.getMessagePublishInfo();
- writer.writeUTF(String.valueOf(mpi.getExchange()));
- writer.writeUTF(String.valueOf(mpi.getRoutingKey()));
- writer.writeBoolean(mpi.isMandatory());
- writer.writeBoolean(mpi.isImmediate());
- ContentHeaderBody chb = message.getContentHeaderBody();
+ writer.writeLong(message.getArrivalTime());
- // write out the content header body
- final int bodySize = chb.getSize();
- byte[] underlying = new byte[bodySize];
- ByteBuffer buf = ByteBuffer.wrap(underlying);
- chb.writePayload(buf);
+ MessagePublishInfo mpi = message.getMessagePublishInfo();
+ writer.writeUTF(String.valueOf(mpi.getExchange()));
+ writer.writeUTF(String.valueOf(mpi.getRoutingKey()));
+ writer.writeBoolean(mpi.isMandatory());
+ writer.writeBoolean(mpi.isImmediate());
+ ContentHeaderBody chb = message.getContentHeaderBody();
+
+ // write out the content header body
+ final int bodySize = chb.getSize();
+ byte[] underlying = new byte[bodySize];
+ ByteBuffer buf = ByteBuffer.wrap(underlying);
+ chb.writePayload(buf);
- writer.writeInt(bodySize);
- writer.write(underlying, 0, bodySize);
+ writer.writeInt(bodySize);
+ writer.write(underlying, 0, bodySize);
- int bodyCount = message.getBodyCount();
- writer.writeInt(bodyCount);
+ int bodyCount = message.getBodyCount();
+ writer.writeInt(bodyCount);
- //WriteContentBody
- for (int index = 0; index < bodyCount; index++)
- {
- ContentChunk chunk = message.getContentChunk(index);
- int length = chunk.getSize();
+ //WriteContentBody
+ for (int index = 0; index < bodyCount; index++)
+ {
+ ContentChunk chunk = message.getContentChunk(index);
+ int length = chunk.getSize();
- byte[] chunk_underlying = new byte[length];
+ byte[] chunk_underlying = new byte[length];
- ByteBuffer chunk_buf = chunk.getData();
+ ByteBuffer chunk_buf = chunk.getData();
- chunk_buf.duplicate().rewind().get(chunk_underlying);
+ chunk_buf.duplicate().rewind().get(chunk_underlying);
- writer.writeInt(length);
- writer.write(chunk_underlying, 0, length);
+ writer.writeInt(length);
+ writer.write(chunk_underlying, 0, length);
+ }
}
- }
- catch (FileNotFoundException e)
- {
- error = e;
- }
- catch (IOException e)
- {
- error = e;
- }
- finally
- {
- // In a FileNotFound situation writer will be null.
- if (writer != null)
+ catch (FileNotFoundException e)
{
- try
- {
- writer.flush();
- writer.close();
- }
- catch (IOException e)
+ error = e;
+ }
+ catch (IOException e)
+ {
+ error = e;
+ }
+ finally
+ {
+ // In a FileNotFound situation writer will be null.
+ if (writer != null)
{
- error = e;
+ try
+ {
+ writer.flush();
+ writer.close();
+ }
+ catch (IOException e)
+ {
+ error = e;
+ }
}
}
- }
- if (error != null)
- {
- _log.error("Unable to unload message(" + messageId + ") to disk,
restoring state.");
- handle.delete();
- throw new UnableToFlowMessageException(messageId, error);
+ if (error != null)
+ {
+ _log.error("Unable to unload message(" + messageId + ") to
disk, restoring state.");
+ handle.delete();
+ throw new UnableToFlowMessageException(messageId, error);
+ }
}
}
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java?rev=758397&r1=758396&r2=758397&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java
Wed Mar 25 18:39:25 2009
@@ -166,7 +166,7 @@
{
// If we've increased the minimum memory above what we have in memory
then
// we need to inhale more if there is more
- if (_atomicQueueInMemory.get() < _memoryUsageMinimum &&
_atomicQueueSize.get() > 0)
+ if (!_disabled && _atomicQueueInMemory.get() < _memoryUsageMinimum &&
_atomicQueueSize.get() > 0)
{
startInhaler();
}
@@ -204,7 +204,7 @@
*/
public void entryUnloadedUpdateMemory(QueueEntry queueEntry)
{
- if (_atomicQueueInMemory.addAndGet(-queueEntry.getSize()) < 0)
+ if (!_disabled &&
_atomicQueueInMemory.addAndGet(-queueEntry.getSize()) < 0)
{
_log.error("InMemory Count just went below 0:" +
queueEntry.debugIdentity());
}
@@ -219,7 +219,7 @@
*/
public void entryLoadedUpdateMemory(QueueEntry queueEntry)
{
- if (_atomicQueueInMemory.addAndGet(queueEntry.getSize()) >
_memoryUsageMaximum)
+ if (!_disabled && _atomicQueueInMemory.addAndGet(queueEntry.getSize())
> _memoryUsageMaximum)
{
_log.error("Loaded to much data!:" + _atomicQueueInMemory.get() +
"/" + _memoryUsageMaximum);
setFlowed(true);
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStore.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStore.java?rev=758397&r1=758396&r2=758397&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStore.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStore.java
Wed Mar 25 18:39:25 2009
@@ -26,8 +26,36 @@
public interface QueueBackingStore
{
+ /**
+ * Retrieve the message with a given ID
+ *
+ * This method must be thread safe.
+ *
+ * Multiple calls to load with a given messageId DO NOT need to return the
same object.
+ *
+ * @param messageId the id of the message to retreive.
+ * @return
+ */
AMQMessage load(Long messageId);
+ /**
+ * Store a message in the BackingStore.
+ *
+ * This method must be thread safe understanding that multiple message
objects may be the same data.
+ *
+ * Allowing a thread to return from this method means that it is safe to
call load()
+ *
+ * Implementer guide:
+ * Until the message has been loaded the message references will all be
the same object.
+ *
+ * One appraoch as taken by the @see FileQueueBackingStore is to block
aimulataneous calls to this method
+ * until the message is fully on disk. This can be done by synchronising
on message as initially it is always the
+ * same object. Only after a load has taken place will there be a
discrepency.
+ *
+ *
+ * @param message the message to unload
+ * @throws UnableToFlowMessageException
+ */
void unload(AMQMessage message) throws UnableToFlowMessageException;
void delete(Long messageId);
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=758397&r1=758396&r2=758397&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
Wed Mar 25 18:39:25 2009
@@ -226,7 +226,7 @@
void unload();
- void load();
+ AMQMessage load();
boolean isFlowed();
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=758397&r1=758396&r2=758397&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
Wed Mar 25 18:39:25 2009
@@ -31,6 +31,7 @@
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
public class QueueEntryImpl implements QueueEntry
@@ -41,7 +42,7 @@
private final SimpleQueueEntryList _queueEntryList;
- private AMQMessage _message;
+ private AtomicReference<AMQMessage> _messageRef;
private boolean _redelivered;
@@ -102,7 +103,7 @@
public QueueEntryImpl(SimpleQueueEntryList queueEntryList, AMQMessage
message)
{
_queueEntryList = queueEntryList;
- _message = message;
+ _messageRef = new AtomicReference<AMQMessage>(message);
if (message != null)
{
_messageId = message.getMessageId();
@@ -136,11 +137,7 @@
public AMQMessage getMessage()
{
- if (_message == null)
- {
- return _backingStore.load(_messageId);
- }
- return _message;
+ return load();
}
public Long getMessageId()
@@ -231,13 +228,17 @@
public String debugIdentity()
{
String entry = "[State:" + _state.getState().name() + "]";
- if (_message == null)
+
+ AMQMessage message = _messageRef.get();
+
+ if (message == null)
{
return entry + "(Message Unloaded ID:" + _messageId + ")";
}
else
{
- return entry + _message.debugIdentity();
+
+ return entry + message.debugIdentity();
}
}
@@ -398,23 +399,27 @@
public void unload()
{
- if (_message != null && _backingStore != null)
- {
+ //Get the currently loaded message
+ AMQMessage message = _messageRef.get();
+ // If we have a message in memory and we have a valid backingStore
attempt to unload
+ if (message != null && _backingStore != null)
+ {
try
{
- if (!_hasBeenUnloaded)
+ // The backingStore will now handle concurrent calls to unload
and safely synchronize to ensure
+ // multiple initial unloads are unloads
+ _backingStore.unload(message);
+ _hasBeenUnloaded = true;
+ _messageRef.set(null);
+
+ if (_log.isDebugEnabled())
{
- _hasBeenUnloaded = true;
+ _log.debug("Unloaded:" + debugIdentity());
+ }
- _backingStore.unload(_message);
- if (_log.isDebugEnabled())
- {
- _log.debug("Unloaded:" + debugIdentity());
- }
- }
- _message = null;
+ // Clear the message reference if the loaded message is still
the one we are processing.
//Update the memoryState if this load call resulted in the
message being purged from memory
if (!_flowed.getAndSet(true))
@@ -434,23 +439,56 @@
}
}
- public void load()
+ public AMQMessage load()
{
+ // MessageId and Backing store are null in test scenarios, normally
this is not the case.
if (_messageId != null && _backingStore != null)
{
- _message = _backingStore.load(_messageId);
+ // See if we have the message currently in memory to return
+ AMQMessage message = _messageRef.get();
+ // if we don't then we need to start a load process.
+ if (message == null)
+ {
+ //Synchronize here to ensure only the first thread that
attempts to load will perform the load from the
+ // backing store.
+ synchronized (this)
+ {
+ // Check again to see if someone else ahead of us loaded
the message
+ message = _messageRef.get();
+ // if we still don't have the message then we need to
start a load process.
+ if (message == null)
+ {
+ // Load the message and keep a reference to it
+ message = _backingStore.load(_messageId);
+ // Set the message reference
+ _messageRef.set(message);
+ }
+ else
+ {
+ // If someone else loaded the message then we can jump
out here as the Memory Updates will
+ // have been performed by the loading thread
+ return message;
+ }
+ }
- if (_log.isDebugEnabled())
- {
- _log.debug("Loaded:" + debugIdentity());
- }
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Loaded:" + debugIdentity());
+ }
- //Update the memoryState if this load call resulted in the message
comming in to memory
- if (_flowed.getAndSet(false))
- {
- _queueEntryList.entryLoadedUpdateMemory(this);
+ //Update the memoryState if this load call resulted in the
message comming in to memory
+ if (_flowed.getAndSet(false))
+ {
+ _queueEntryList.entryLoadedUpdateMemory(this);
+ }
}
+
+ // Return the message that was either already in memory or the
value we just loaded.
+ return message;
}
+ // This can be null but only in the case where we have no messageId
+ // in the case where we have no backingStore then we will never have
unloaded the message
+ return _messageRef.get();
}
public boolean isFlowed()
Modified:
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java?rev=758397&r1=758396&r2=758397&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java
(original)
+++
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java
Wed Mar 25 18:39:25 2009
@@ -32,6 +32,7 @@
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.MockSubscription;
+import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.AMQException;
import java.util.Map;
@@ -96,6 +97,12 @@
assertEquals("Map does not contain correct setup data",
INITIAL_MSG_COUNT, _unacknowledgedMessageMap.size());
}
+ public void tearDown() throws Exception
+ {
+ //Ensure we close the registry that the MockAMQQueue will create
+ ApplicationRegistry.getInstance().close();
+ }
+
/**
* Helper method to create a new subscription and aquire the given
messages.
*
Modified:
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java?rev=758397&r1=758396&r2=758397&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
(original)
+++
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
Wed Mar 25 18:39:25 2009
@@ -36,11 +36,9 @@
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.FailedDequeueException;
import org.apache.qpid.server.queue.IncomingMessage;
-import org.apache.qpid.server.queue.MessageCleanupException;
import org.apache.qpid.server.queue.MockProtocolSession;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.queue.SimpleAMQQueue;
-import org.apache.qpid.server.queue.UnableToFlowMessageException;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.MemoryMessageStore;
import org.apache.qpid.server.store.SkeletonMessageStore;
@@ -356,9 +354,9 @@
//To change body of implemented methods use File |
Settings | File Templates.
}
- public void load()
+ public AMQMessage load()
{
- //To change body of implemented methods use File |
Settings | File Templates.
+ return null;
}
public boolean isFlowed()
Modified:
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/filter/PropertyExpressionTest.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/filter/PropertyExpressionTest.java?rev=758397&r1=758396&r2=758397&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/filter/PropertyExpressionTest.java
(original)
+++
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/filter/PropertyExpressionTest.java
Wed Mar 25 18:39:25 2009
@@ -23,10 +23,18 @@
import junit.framework.TestCase;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.queue.MockQueueEntry;
+import org.apache.qpid.server.registry.ApplicationRegistry;
public class PropertyExpressionTest extends TestCase
{
+ public void tearDown() throws Exception
+ {
+ //Ensure we close the registry that the MockQueueEntry will create
+ ApplicationRegistry.remove(1);
+ }
+
+
public void testJMSRedelivered()
{
PropertyExpression<AMQException> pe = new
PropertyExpression<AMQException>("JMSRedelivered");
Modified:
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java?rev=758397&r1=758396&r2=758397&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java
(original)
+++
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java
Wed Mar 25 18:39:25 2009
@@ -23,6 +23,14 @@
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.amqp_8_0.BasicPublishBodyImpl;
+
+import java.util.LinkedList;
+import java.util.ArrayList;
public class MockAMQMessage extends TransientAMQMessage
{
@@ -31,6 +39,14 @@
{
super(messageId);
_messagePublishInfo = new
MessagePublishInfoImpl(null,false,false,null);
+ BasicContentHeaderProperties properties = new
BasicContentHeaderProperties();
+
+ properties.setMessageId(String.valueOf(messageId));
+ properties.setTimestamp(System.currentTimeMillis());
+ properties.setDeliveryMode((byte)1);
+
+ _contentHeaderBody = new ContentHeaderBody(properties,
BasicPublishBodyImpl.CLASS_ID);
+ _contentBodies = new ArrayList<ContentChunk>();
}
Modified:
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java?rev=758397&r1=758396&r2=758397&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
(original)
+++
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
Wed Mar 25 18:39:25 2009
@@ -20,16 +20,18 @@
*/
package org.apache.qpid.server.queue;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.configuration.QueueConfiguration;
+import org.apache.qpid.server.configuration.ServerConfiguration;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.management.ManagedObject;
-import org.apache.qpid.AMQException;
-import org.apache.commons.configuration.Configuration;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.registry.ApplicationRegistry;
import java.util.List;
import java.util.Set;
@@ -39,10 +41,20 @@
private boolean _deleted = false;
private int _queueCount;
private AMQShortString _name;
+ private VirtualHost _virtualhost;
public MockAMQQueue(String name)
{
- _name = new AMQShortString(name);
+ _name = new AMQShortString(name);
+ _virtualhost =
ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test");
+ try
+ {
+ _virtualhost.getQueueRegistry().registerQueue(this);
+ }
+ catch (AMQException e)
+ {
+ e.printStackTrace();
+ }
}
public AMQShortString getName()
@@ -67,7 +79,7 @@
public VirtualHost getVirtualHost()
{
- return null; //To change body of implemented methods use File |
Settings | File Templates.
+ return _virtualhost;
}
public void bind(Exchange exchange, AMQShortString routingKey, FieldTable
arguments) throws AMQException
@@ -152,7 +164,7 @@
public int delete() throws AMQException
{
- return 0; //To change body of implemented methods use File | Settings
| File Templates.
+ return 0; //To change body of implemented methods use File | Settings
| File Templates.
}
public QueueEntry enqueue(StoreContext storeContext, AMQMessage message)
throws AMQException
@@ -343,7 +355,7 @@
public void setMinimumAlertRepeatGap(long value)
{
-
+
}
}
Modified:
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java?rev=758397&r1=758396&r2=758397&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java
(original)
+++
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java
Wed Mar 25 18:39:25 2009
@@ -40,6 +40,7 @@
import org.apache.qpid.server.queue.MockAMQQueue;
import org.apache.qpid.server.queue.MockProtocolSession;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.registry.ApplicationRegistry;
public class ACLManagerTest extends TestCase
{
@@ -67,6 +68,12 @@
_session = new MockProtocolSession(new TestableMemoryMessageStore());
}
+
+ public void tearDown() throws Exception
+ {
+ //Ensure we close the registry that the MockAMQQueue will create
+ ApplicationRegistry.getInstance().close();
+ }
public void testACLManagerConfigurationPluginManager() throws Exception
{
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]