Author: ritchiem
Date: Tue Apr 14 15:54:16 2009
New Revision: 764850
URL: http://svn.apache.org/viewvc?rev=764850&view=rev
Log:
QPID-1807 : Add 0.5-fix broker and update SlowMessageStore to use MessageStores
rather than TransactionLogs
Added:
qpid/trunk/qpid/java/broker/ (props changed)
- copied from r764833, qpid/branches/0.5-fix/qpid/java/broker/
Modified:
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
Propchange: qpid/trunk/qpid/java/broker/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Tue Apr 14 15:54:16 2009
@@ -0,0 +1,9 @@
+*.iml
+intellijclasses
+log
+target
+release
+qpid-broker.ipr
+qpid-broker.iws
+.classpath
+.project
Propchange: qpid/trunk/qpid/java/broker/
------------------------------------------------------------------------------
--- svn:mergeinfo (added)
+++ svn:mergeinfo Tue Apr 14 15:54:16 2009
@@ -0,0 +1 @@
+/qpid/trunk/qpid/java/broker:742626,743015,743028-743029,743304,743306,743311,743357,744113,747363,747367,747369-747370,747376,747783,747868-747870,747875,748561,748591,748641,748680,748686,749149,749282,749285,749315,749340,749572,753219-753220,753253,754934,754958,755256,757258,757270,758730,759097,760919,761721,762365,762992,763959,764109,764140,764790
Modified:
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java?rev=764850&r1=764849&r2=764850&view=diff
==============================================================================
---
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
(original)
+++
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
Tue Apr 14 15:54:16 2009
@@ -31,62 +31,54 @@
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MessageMetaData;
-import org.apache.qpid.server.transactionlog.TransactionLog;
-import org.apache.qpid.server.routing.RoutingTable;
import java.util.HashMap;
import java.util.Iterator;
-import java.util.ArrayList;
-public class SlowMessageStore implements TransactionLog, RoutingTable
+public class SlowMessageStore implements MessageStore
{
private static final Logger _logger =
Logger.getLogger(SlowMessageStore.class);
private static final String DELAYS = "delays";
private HashMap<String, Long> _preDelays = new HashMap<String, Long>();
private HashMap<String, Long> _postDelays = new HashMap<String, Long>();
private long _defaultDelay = 0L;
- private TransactionLog _realTransactionLog = new MemoryMessageStore();
- private RoutingTable _realRoutingTable = (RoutingTable)_realTransactionLog;
+ private MessageStore _realStore = new MemoryMessageStore();
private static final String PRE = "pre";
private static final String POST = "post";
private String DEFAULT_DELAY = "default";
- public Object configure(VirtualHost virtualHost, String base,
VirtualHostConfiguration config) throws Exception
+ public void configure(VirtualHost virtualHost, String base,
VirtualHostConfiguration config) throws Exception
{
- _logger.warn("Starting SlowMessageStore on Virtualhost:" +
virtualHost.getName());
+ _logger.info("Starting SlowMessageStore on Virtualhost:" +
virtualHost.getName());
Configuration delays = config.getStoreConfiguration().subset(DELAYS);
configureDelays(delays);
- String transactionLogClass = config.getTransactionLogClass();
+ String messageStoreClass =
config.getStoreConfiguration().getString("realStore");
if (delays.containsKey(DEFAULT_DELAY))
{
_defaultDelay = delays.getLong(DEFAULT_DELAY);
- _logger.warn("Delay is:" + _defaultDelay);
}
- if (transactionLogClass != null)
+ if (messageStoreClass != null)
{
- Class clazz = Class.forName(transactionLogClass);
- if (clazz != this.getClass())
- {
+ Class clazz = Class.forName(messageStoreClass);
- Object o = clazz.newInstance();
+ Object o = clazz.newInstance();
- if (!(o instanceof TransactionLog))
- {
- throw new ClassCastException("TransactionLog class must
implement " + TransactionLog.class + ". Class " + clazz +
- " does not.");
- }
- _realTransactionLog = (TransactionLog) o;
+ if (!(o instanceof MessageStore))
+ {
+ throw new ClassCastException("Message store class must
implement " + MessageStore.class + ". Class " + clazz +
+ " does not.");
}
+ _realStore = (MessageStore) o;
+ _realStore.configure(virtualHost, base + ".store", config);
+ }
+ else
+ {
+ _realStore.configure(virtualHost, base + ".store", config);
}
-
- // The call to configure may return a new transaction log
- _realTransactionLog = (TransactionLog)
_realTransactionLog.configure(virtualHost, base , config);
-
- return this;
}
private void configureDelays(Configuration config)
@@ -159,35 +151,42 @@
public void close() throws Exception
{
doPreDelay("close");
- _realTransactionLog.close();
+ _realStore.close();
doPostDelay("close");
}
+ public void removeMessage(StoreContext storeContext, Long messageId)
throws AMQException
+ {
+ doPreDelay("removeMessage");
+ _realStore.removeMessage(storeContext, messageId);
+ doPostDelay("removeMessage");
+ }
+
public void createExchange(Exchange exchange) throws AMQException
{
doPreDelay("createExchange");
- _realRoutingTable.createExchange(exchange);
+ _realStore.createExchange(exchange);
doPostDelay("createExchange");
}
public void removeExchange(Exchange exchange) throws AMQException
{
doPreDelay("removeExchange");
- _realRoutingTable.removeExchange(exchange);
+ _realStore.removeExchange(exchange);
doPostDelay("removeExchange");
}
public void bindQueue(Exchange exchange, AMQShortString routingKey,
AMQQueue queue, FieldTable args) throws AMQException
{
doPreDelay("bindQueue");
- _realRoutingTable.bindQueue(exchange, routingKey, queue, args);
+ _realStore.bindQueue(exchange, routingKey, queue, args);
doPostDelay("bindQueue");
}
public void unbindQueue(Exchange exchange, AMQShortString routingKey,
AMQQueue queue, FieldTable args) throws AMQException
{
doPreDelay("unbindQueue");
- _realRoutingTable.unbindQueue(exchange, routingKey, queue, args);
+ _realStore.unbindQueue(exchange, routingKey, queue, args);
doPostDelay("unbindQueue");
}
@@ -199,100 +198,101 @@
public void createQueue(AMQQueue queue, FieldTable arguments) throws
AMQException
{
doPreDelay("createQueue");
- _realRoutingTable.createQueue(queue, arguments);
+ _realStore.createQueue(queue, arguments);
doPostDelay("createQueue");
}
public void removeQueue(AMQQueue queue) throws AMQException
{
doPreDelay("removeQueue");
- _realRoutingTable.removeQueue(queue);
+ _realStore.removeQueue(queue);
doPostDelay("removeQueue");
}
- public void enqueueMessage(StoreContext context, ArrayList<AMQQueue>
queues, Long messageId) throws AMQException
+ public void enqueueMessage(StoreContext context, AMQQueue queue, Long
messageId) throws AMQException
{
doPreDelay("enqueueMessage");
- _realTransactionLog.enqueueMessage(context, queues, messageId);
+ _realStore.enqueueMessage(context, queue, messageId);
doPostDelay("enqueueMessage");
}
public void dequeueMessage(StoreContext context, AMQQueue queue, Long
messageId) throws AMQException
{
doPreDelay("dequeueMessage");
- _realTransactionLog.dequeueMessage(context, queue, messageId);
- doPostDelay("dequeueMessage");
- }
-
- public void removeMessage(StoreContext context, Long messageId) throws
AMQException
- {
- doPreDelay("dequeueMessage");
- _realTransactionLog.removeMessage(context, messageId);
+ _realStore.dequeueMessage(context, queue, messageId);
doPostDelay("dequeueMessage");
}
public void beginTran(StoreContext context) throws AMQException
{
doPreDelay("beginTran");
- _realTransactionLog.beginTran(context);
+ _realStore.beginTran(context);
doPostDelay("beginTran");
}
public void commitTran(StoreContext context) throws AMQException
{
doPreDelay("commitTran");
- _realTransactionLog.commitTran(context);
+ _realStore.commitTran(context);
doPostDelay("commitTran");
}
public void abortTran(StoreContext context) throws AMQException
{
doPreDelay("abortTran");
- _realTransactionLog.abortTran(context);
+ _realStore.abortTran(context);
doPostDelay("abortTran");
}
public boolean inTran(StoreContext context)
{
doPreDelay("inTran");
- boolean b = _realTransactionLog.inTran(context);
+ boolean b = _realStore.inTran(context);
doPostDelay("inTran");
return b;
}
+ public Long getNewMessageId()
+ {
+ doPreDelay("getNewMessageId");
+ Long l = _realStore.getNewMessageId();
+ doPostDelay("getNewMessageId");
+ return l;
+ }
+
public void storeContentBodyChunk(StoreContext context, Long messageId,
int index, ContentChunk contentBody, boolean lastContentBody) throws
AMQException
{
doPreDelay("storeContentBodyChunk");
- _realTransactionLog.storeContentBodyChunk(context, messageId, index,
contentBody, lastContentBody);
+ _realStore.storeContentBodyChunk(context, messageId, index,
contentBody, lastContentBody);
doPostDelay("storeContentBodyChunk");
}
public void storeMessageMetaData(StoreContext context, Long messageId,
MessageMetaData messageMetaData) throws AMQException
{
doPreDelay("storeMessageMetaData");
- _realTransactionLog.storeMessageMetaData(context, messageId,
messageMetaData);
+ _realStore.storeMessageMetaData(context, messageId, messageMetaData);
doPostDelay("storeMessageMetaData");
}
-// public MessageMetaData getMessageMetaData(StoreContext context, Long
messageId) throws AMQException
-// {
-// doPreDelay("getMessageMetaData");
-// MessageMetaData mmd =
_realTransactionLog.getMessageMetaData(context, messageId);
-// doPostDelay("getMessageMetaData");
-// return mmd;
-// }
-//
-// public ContentChunk getContentBodyChunk(StoreContext context, Long
messageId, int index) throws AMQException
-// {
-// doPreDelay("getContentBodyChunk");
-// ContentChunk c = _realTransactionLog.getContentBodyChunk(context,
messageId, index);
-// doPostDelay("getContentBodyChunk");
-// return c;
-// }
-//
+ public MessageMetaData getMessageMetaData(StoreContext context, Long
messageId) throws AMQException
+ {
+ doPreDelay("getMessageMetaData");
+ MessageMetaData mmd = _realStore.getMessageMetaData(context,
messageId);
+ doPostDelay("getMessageMetaData");
+ return mmd;
+ }
+
+ public ContentChunk getContentBodyChunk(StoreContext context, Long
messageId, int index) throws AMQException
+ {
+ doPreDelay("getContentBodyChunk");
+ ContentChunk c = _realStore.getContentBodyChunk(context, messageId,
index);
+ doPostDelay("getContentBodyChunk");
+ return c;
+ }
+
public boolean isPersistent()
{
- return _realTransactionLog.isPersistent();
+ return _realStore.isPersistent();
}
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]