Author: ritchiem
Date: Mon Apr 13 17:28:36 2009
New Revision: 764539
URL: http://svn.apache.org/viewvc?rev=764539&view=rev
Log:
753253 ritchiem QPID-1730 : Logging update highlighted that we were
printing the queue before we had fully initialised it.
753220 ritchiem QPID-1730 : Improve the logging so we can see what is
going one during persistent recovery as we do not have a working persistent
module.
753219 ritchiem QPID-1730 : Update the order in which we initialise. We
now load the config file from disk then recover from the persistent strore.
This approach applies the vhost configuration and then applies the persistent
state from the store to those objects rather than recreating them. The new
inner classes on VirtualHost are to be removed once we have fully extracted the
RoutingTable from the legacy MessageStores as this is the root of the problem.
The Store needs to be open to create new durable objects but the current stores
must recover their state before new entries are added. So now the persistent
state is being loaded on to a broker in a consistent state after it has
configured a) its default exchanges and b) loaded the queue config from the
config file. Eventually we will only have one location for queue config and all
will be right in the world.
merged the above three changes from trunk that allow the broker to create it's
model before the MessageStore is initialised.
Modified:
qpid/branches/0.5-fix/qpid/java/broker/bin/ (props changed)
qpid/branches/0.5-fix/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
qpid/branches/0.5-fix/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
qpid/branches/0.5-fix/qpid/java/broker/src/main/java/org/apache/qpid/server/management/
(props changed)
qpid/branches/0.5-fix/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/
(props changed)
qpid/branches/0.5-fix/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
qpid/branches/0.5-fix/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
Propchange: qpid/branches/0.5-fix/qpid/java/broker/bin/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Apr 13 17:28:36 2009
@@ -1,2 +1,2 @@
/qpid/branches/0.5-release/qpid/java/broker/bin:757268
-/qpid/trunk/qpid/java/broker/bin:758730,760919,761721,762365,762992,763959
+/qpid/trunk/qpid/java/broker/bin:753219-753220,753253,758730,760919,761721,762365,762992,763959
Modified:
qpid/branches/0.5-fix/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
URL:
http://svn.apache.org/viewvc/qpid/branches/0.5-fix/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java?rev=764539&r1=764538&r2=764539&view=diff
==============================================================================
---
qpid/branches/0.5-fix/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
(original)
+++
qpid/branches/0.5-fix/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
Mon Apr 13 17:28:36 2009
@@ -191,7 +191,7 @@
public String toString()
{
- return getClass().getName() + "[" + getName() +"]";
+ return getClass().getSimpleName() + "[" + getName() +"]";
}
public ManagedObject getManagedObject()
Modified:
qpid/branches/0.5-fix/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
URL:
http://svn.apache.org/viewvc/qpid/branches/0.5-fix/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java?rev=764539&r1=764538&r2=764539&view=diff
==============================================================================
---
qpid/branches/0.5-fix/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
(original)
+++
qpid/branches/0.5-fix/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
Mon Apr 13 17:28:36 2009
@@ -168,16 +168,14 @@
{
if (_logger.isDebugEnabled())
{
- _logger.debug("Queue (" + queue.getName() + ")" + queue + " is
already registered with routing key " + routingKey);
+ _logger.debug("Queue (" + queue + ") is already registered
with routing key " + routingKey);
}
}
else
{
if (_logger.isDebugEnabled())
{
- _logger.debug("Binding queue(" + queue.getName() + ") " +
queue + " with routing key " + routingKey
- + (args == null ? "" : " and arguments " +
args.toString())
- + " to exchange " + this);
+ _logger.debug("Binding queue:" + queue + " with routing key '"
+ routingKey +"' to exchange:" + this);
}
}
}
Propchange:
qpid/branches/0.5-fix/qpid/java/broker/src/main/java/org/apache/qpid/server/management/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Apr 13 17:28:36 2009
@@ -1,2 +1,2 @@
/qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/management:757268
-/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management:758730,760919,761721,762365,762992,763959
+/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management:753219-753220,753253,758730,760919,761721,762365,762992,763959
Propchange:
qpid/branches/0.5-fix/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Apr 13 17:28:36 2009
@@ -1,2 +1,2 @@
/qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:757257
-/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:757270,758730,760919,761721,762365,762992,763959
+/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:753219-753220,753253,757270,758730,760919,761721,762365,762992,763959
Modified:
qpid/branches/0.5-fix/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
URL:
http://svn.apache.org/viewvc/qpid/branches/0.5-fix/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java?rev=764539&r1=764538&r2=764539&view=diff
==============================================================================
---
qpid/branches/0.5-fix/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
(original)
+++
qpid/branches/0.5-fix/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
Mon Apr 13 17:28:36 2009
@@ -67,5 +67,4 @@
}
}
-
}
Modified:
qpid/branches/0.5-fix/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
URL:
http://svn.apache.org/viewvc/qpid/branches/0.5-fix/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java?rev=764539&r1=764538&r2=764539&view=diff
==============================================================================
---
qpid/branches/0.5-fix/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
(original)
+++
qpid/branches/0.5-fix/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
Mon Apr 13 17:28:36 2009
@@ -20,19 +20,14 @@
*/
package org.apache.qpid.server.virtualhost;
-import java.util.Collections;
-import java.util.List;
-import java.util.Timer;
-import java.util.TimerTask;
-
-import javax.management.NotCompliantMBeanException;
-
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.server.AMQBrokerManagerMBean;
import org.apache.qpid.server.configuration.ExchangeConfiguration;
import org.apache.qpid.server.configuration.QueueConfiguration;
@@ -50,6 +45,7 @@
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.DefaultQueueRegistry;
import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.security.access.ACLManager;
import org.apache.qpid.server.security.access.Accessable;
@@ -57,12 +53,19 @@
import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
import
org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoreContext;
+
+import javax.management.NotCompliantMBeanException;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
public class VirtualHost implements Accessable
{
private static final Logger _logger = Logger.getLogger(VirtualHost.class);
-
private final String _name;
private ConnectionRegistry _connectionRegistry;
@@ -84,7 +87,6 @@
private ACLManager _accessManager;
private final Timer _houseKeepingTimer;
-
private VirtualHostConfiguration _configuration;
public void setAccessableName(String name)
@@ -134,15 +136,26 @@
return VirtualHost.this;
}
-
} // End of MBean class
+ /**
+ * Normal Constructor
+ *
+ * @param hostConfig
+ *
+ * @throws Exception
+ */
+ public VirtualHost(VirtualHostConfiguration hostConfig) throws Exception
+ {
+ this(hostConfig, null);
+ }
public VirtualHost(VirtualHostConfiguration hostConfig, MessageStore
store) throws Exception
{
_configuration = hostConfig;
- _name = hostConfig.getName();
- if (_name == null || _name.length() == 0)
+ _name = hostConfig.getName();
+
+ if (_name == null || _name.length() == 0)
{
throw new IllegalArgumentException("Illegal name (" + _name +
") for virtualhost.");
}
@@ -151,8 +164,27 @@
_connectionRegistry = new ConnectionRegistry(this);
- _houseKeepingTimer = new Timer("Queue-housekeeping-"+_name, true);
-
+ _houseKeepingTimer = new Timer("Queue-housekeeping-" + _name, true);
+
+ _queueRegistry = new DefaultQueueRegistry(this);
+
+ _exchangeFactory = new DefaultExchangeFactory(this);
+ _exchangeFactory.initialise(hostConfig);
+
+ _exchangeRegistry = new DefaultExchangeRegistry(this);
+
+ //Create a temporary RT to store the durable entries from the config
file
+ // so we can replay them in to the real _RT after it has been loaded.
+ /// This should be removed after the _RT has been fully split from the
the TL
+
+ StartupRoutingTable configFileRT = new StartupRoutingTable();
+
+ _messageStore = configFileRT;
+
+ // This needs to be after the RT has been defined as it creates the
default durable exchanges.
+ _exchangeRegistry.initialise();
+ initialiseModel(hostConfig);
+
if (store != null)
{
_messageStore = store;
@@ -165,40 +197,44 @@
}
initialiseMessageStore(hostConfig);
}
-
- _queueRegistry = new DefaultQueueRegistry(this);
-
- _exchangeFactory = new DefaultExchangeFactory(this);
- _exchangeFactory.initialise(hostConfig);
- _exchangeRegistry = new DefaultExchangeRegistry(this);
- _exchangeRegistry.initialise();
-
- initialiseModel(hostConfig);
+
+ //Now that the RT has been initialised loop through the persistent
queues/exchanges created from the config
+ // file and write them in to the new routing Table.
+ for (StartupRoutingTable.CreateQueueTuple cqt : configFileRT.queue)
+ {
+ _messageStore.createQueue(cqt.queue, cqt.arguments);
+ }
+
+ for (Exchange exchange : configFileRT.exchange)
+ {
+ _messageStore.createExchange(exchange);
+ }
+
+ for (StartupRoutingTable.CreateBindingTuple cbt :
configFileRT.bindings)
+ {
+ _messageStore.bindQueue(cbt.exchange, cbt.routingKey, cbt.queue,
cbt.arguments);
+ }
+
_authenticationManager = new
PrincipalDatabaseAuthenticationManager(_name, hostConfig);
_accessManager = ApplicationRegistry.getInstance().getAccessManager();
_accessManager.configureHostPlugins(hostConfig.getSecurityConfiguration());
-
+
_brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean);
_brokerMBean.register();
initialiseHouseKeeping(hostConfig.getHousekeepingExpiredMessageCheckPeriod());
}
- public VirtualHost(VirtualHostConfiguration virtualHostConfig) throws
Exception
- {
- this(virtualHostConfig, null);
- }
-
private void initialiseHouseKeeping(long period)
{
/* add a timer task to iterate over queues, cleaning expired messages
from queues with no consumers */
- if(period != 0L)
+ if (period != 0L)
{
class RemoveExpiredMessagesTask extends TimerTask
{
public void run()
{
- for(AMQQueue q : _queueRegistry.getQueues())
+ for (AMQQueue q : _queueRegistry.getQueues())
{
try
@@ -207,7 +243,7 @@
}
catch (AMQException e)
{
- _logger.error("Exception in housekeeping for
queue: " + q.getName().toString(),e);
+ _logger.error("Exception in housekeeping for
queue: " + q.getName().toString(), e);
throw new RuntimeException(e);
}
}
@@ -215,8 +251,8 @@
}
_houseKeepingTimer.scheduleAtFixedRate(new
RemoveExpiredMessagesTask(),
- period/2,
- period);
+ period / 2,
+ period);
}
}
@@ -232,47 +268,48 @@
throw new ClassCastException("Message store class must implement "
+ MessageStore.class + ". Class " + clazz +
" does not.");
}
- _messageStore = (MessageStore) o;
- _messageStore.configure(this, "store", hostConfig);
+ MessageStore messageStore = (MessageStore) o;
+ messageStore.configure(this, "store", hostConfig);
+ _messageStore = messageStore;
}
private void initialiseModel(VirtualHostConfiguration config) throws
ConfigurationException, AMQException
{
- _logger.debug("Loading configuration for virtualhost:
"+config.getName());
+ _logger.debug("Loading configuration for virtualhost: " +
config.getName());
List exchangeNames = config.getExchanges();
- for(Object exchangeNameObj : exchangeNames)
- {
- String exchangeName = String.valueOf(exchangeNameObj);
-
configureExchange(config.getExchangeConfiguration(exchangeName));
- }
+ for (Object exchangeNameObj : exchangeNames)
+ {
+ String exchangeName = String.valueOf(exchangeNameObj);
+ configureExchange(config.getExchangeConfiguration(exchangeName));
+ }
String[] queueNames = config.getQueueNames();
- for(Object queueNameObj : queueNames)
- {
- String queueName = String.valueOf(queueNameObj);
- configureQueue(config.getQueueConfiguration(queueName));
- }
+ for (Object queueNameObj : queueNames)
+ {
+ String queueName = String.valueOf(queueNameObj);
+ configureQueue(config.getQueueConfiguration(queueName));
+ }
}
private void configureExchange(ExchangeConfiguration
exchangeConfiguration) throws AMQException
{
AMQShortString exchangeName = new
AMQShortString(exchangeConfiguration.getName());
- Exchange exchange;
- exchange = _exchangeRegistry.getExchange(exchangeName);
- if(exchange == null)
- {
+ Exchange exchange;
+ exchange = _exchangeRegistry.getExchange(exchangeName);
+ if (exchange == null)
+ {
AMQShortString type = new
AMQShortString(exchangeConfiguration.getType());
boolean durable = exchangeConfiguration.getDurable();
boolean autodelete = exchangeConfiguration.getAutoDelete();
- Exchange newExchange =
_exchangeFactory.createExchange(exchangeName,type,durable,autodelete,0);
- _exchangeRegistry.registerExchange(newExchange);
- }
+ Exchange newExchange =
_exchangeFactory.createExchange(exchangeName, type, durable, autodelete, 0);
+ _exchangeRegistry.registerExchange(newExchange);
+ }
}
private void configureQueue(QueueConfiguration queueConfiguration) throws
AMQException, ConfigurationException
@@ -288,33 +325,36 @@
Exchange exchange = _exchangeRegistry.getExchange(exchangeName == null
? null : new AMQShortString(exchangeName));
- if(exchange == null)
- {
- exchange = _exchangeRegistry.getDefaultExchange();
- }
+ if (exchange == null)
+ {
+ exchange = _exchangeRegistry.getDefaultExchange();
+ }
if (exchange == null)
{
throw new ConfigurationException("Attempt to bind queue to
unknown exchange:" + exchangeName);
}
- List routingKeys = queueConfiguration.getRoutingKeys();
- if(routingKeys == null || routingKeys.isEmpty())
- {
- routingKeys = Collections.singletonList(queue.getName());
- }
+ List routingKeys = queueConfiguration.getRoutingKeys();
+ if (routingKeys == null || routingKeys.isEmpty())
+ {
+ routingKeys = Collections.singletonList(queue.getName());
+ }
- for(Object routingKeyNameObj : routingKeys)
- {
- AMQShortString routingKey = new
AMQShortString(String.valueOf(routingKeyNameObj));
- queue.bind(exchange, routingKey, null);
- _logger.info("Queue '" + queue.getName() + "' bound to
exchange:" + exchangeName + " RK:'" + routingKey + "'");
- }
+ for (Object routingKeyNameObj : routingKeys)
+ {
+ AMQShortString routingKey = new
AMQShortString(String.valueOf(routingKeyNameObj));
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Binding queue:" + queue + " with routing key '"
+ routingKey + "' to exchange:" + this);
+ }
+ queue.bind(exchange, routingKey, null);
+ }
- if(exchange != _exchangeRegistry.getDefaultExchange())
- {
- queue.bind(_exchangeRegistry.getDefaultExchange(),
queue.getName(), null);
- }
+ if (exchange != _exchangeRegistry.getDefaultExchange())
+ {
+ queue.bind(_exchangeRegistry.getDefaultExchange(),
queue.getName(), null);
+ }
}
public String getName()
@@ -355,7 +395,7 @@
public ACLManager getAccessManager()
{
return _accessManager;
- }
+ }
public void close() throws Exception
{
@@ -394,4 +434,160 @@
{
return _virtualHostMBean;
}
+
+ /**
+ * Temporary Startup RT class to record the creation of persistent queues
/ exchanges.
+ *
+ *
+ * This is so we can replay the creation of queues/exchanges in to the
real _RT after it has been loaded.
+ * This should be removed after the _RT has been fully split from the the
TL
+ */
+ private class StartupRoutingTable implements MessageStore
+ {
+ public List<Exchange> exchange = new LinkedList<Exchange>();
+ public List<CreateQueueTuple> queue = new
LinkedList<CreateQueueTuple>();
+ public List<CreateBindingTuple> bindings = new
LinkedList<CreateBindingTuple>();
+
+ public void configure(VirtualHost virtualHost, String base,
VirtualHostConfiguration config) throws Exception
+ {
+ }
+
+ public void close() throws Exception
+ {
+ }
+
+ public void removeMessage(StoreContext storeContext, Long messageId)
throws AMQException
+ {
+ //To change body of implemented methods use File | Settings | File
Templates.
+ }
+
+ public void createExchange(Exchange exchange) throws AMQException
+ {
+ if (exchange.isDurable())
+ {
+ this.exchange.add(exchange);
+ }
+ }
+
+ public void removeExchange(Exchange exchange) throws AMQException
+ {
+ }
+
+ public void bindQueue(Exchange exchange, AMQShortString routingKey,
AMQQueue queue, FieldTable args) throws AMQException
+ {
+ if (exchange.isDurable() && queue.isDurable())
+ {
+ bindings.add(new CreateBindingTuple(exchange, routingKey,
queue, args));
+ }
+ }
+
+ public void unbindQueue(Exchange exchange, AMQShortString routingKey,
AMQQueue queue, FieldTable args) throws AMQException
+ {
+ }
+
+ public void createQueue(AMQQueue queue) throws AMQException
+ {
+ createQueue(queue, null);
+ }
+
+ public void createQueue(AMQQueue queue, FieldTable arguments) throws
AMQException
+ {
+ if (queue.isDurable())
+ {
+ this.queue.add(new CreateQueueTuple(queue, arguments));
+ }
+ }
+
+ public void removeQueue(AMQQueue queue) throws AMQException
+ {
+ }
+
+ public void enqueueMessage(StoreContext context, AMQQueue queue, Long
messageId) throws AMQException
+ {
+ //To change body of implemented methods use File | Settings | File
Templates.
+ }
+
+ public void dequeueMessage(StoreContext context, AMQQueue queue, Long
messageId) throws AMQException
+ {
+ //To change body of implemented methods use File | Settings | File
Templates.
+ }
+
+ public void beginTran(StoreContext context) throws AMQException
+ {
+ //To change body of implemented methods use File | Settings | File
Templates.
+ }
+
+ public void commitTran(StoreContext context) throws AMQException
+ {
+ //To change body of implemented methods use File | Settings | File
Templates.
+ }
+
+ public void abortTran(StoreContext context) throws AMQException
+ {
+ //To change body of implemented methods use File | Settings | File
Templates.
+ }
+
+ public boolean inTran(StoreContext context)
+ {
+ return false; //To change body of implemented methods use File |
Settings | File Templates.
+ }
+
+ public Long getNewMessageId()
+ {
+ return null; //To change body of implemented methods use File |
Settings | File Templates.
+ }
+
+ public void storeContentBodyChunk(StoreContext context, Long
messageId, int index, ContentChunk contentBody, boolean lastContentBody) throws
AMQException
+ {
+ //To change body of implemented methods use File | Settings | File
Templates.
+ }
+
+ public void storeMessageMetaData(StoreContext context, Long messageId,
MessageMetaData messageMetaData) throws AMQException
+ {
+ //To change body of implemented methods use File | Settings | File
Templates.
+ }
+
+ public MessageMetaData getMessageMetaData(StoreContext context, Long
messageId) throws AMQException
+ {
+ return null; //To change body of implemented methods use File |
Settings | File Templates.
+ }
+
+ public ContentChunk getContentBodyChunk(StoreContext context, Long
messageId, int index) throws AMQException
+ {
+ return null; //To change body of implemented methods use File |
Settings | File Templates.
+ }
+
+ public boolean isPersistent()
+ {
+ return false; //To change body of implemented methods use File |
Settings | File Templates.
+ }
+
+ private class CreateQueueTuple
+ {
+ public AMQQueue queue;
+ public FieldTable arguments;
+
+ public CreateQueueTuple(AMQQueue queue, FieldTable arguments)
+ {
+ this.queue = queue;
+ this.arguments = arguments;
+ }
+ }
+
+ private class CreateBindingTuple
+ {
+ public AMQQueue queue;
+ public FieldTable arguments;
+ public Exchange exchange;
+ public AMQShortString routingKey;
+
+ public CreateBindingTuple(Exchange exchange, AMQShortString
routingKey, AMQQueue queue, FieldTable args)
+ {
+ this.exchange = exchange;
+ this.routingKey = routingKey;
+ this.queue = queue;
+ arguments = args;
+ }
+ }
+ }
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]