Author: kpvdr
Date: Fri May 8 14:25:34 2009
New Revision: 773004
URL: http://svn.apache.org/viewvc?rev=773004&view=rev
Log:
Fixed cluster store problem where second and subsequent cluster nodes (which
are persistent) to join a cluster fail with a "Exchange already exists:
amq.direct (MessageStoreImpl.cpp:488)" message. To do this a new method was
added to MessageStore called discardInit() which will throw away all restored
data (if any) and restart as though no restore had taken place.
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h
qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h
qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=773004&r1=773003&r2=773004&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Fri May 8 14:25:34 2009
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -114,11 +114,11 @@
("staging-threshold", optValue(stagingThreshold, "N"), "Stages
messages over N bytes to disk")
("mgmt-enable,m", optValue(enableMgmt,"yes|no"), "Enable Management")
("mgmt-pub-interval", optValue(mgmtPubInterval, "SECONDS"),
"Management Publish Interval")
- ("queue-purge-interval", optValue(queueCleanInterval, "SECONDS"),
+ ("queue-purge-interval", optValue(queueCleanInterval, "SECONDS"),
"Interval between attempts to purge any expired messages from queues")
("auth", optValue(auth, "yes|no"), "Enable authentication, if disabled
all incoming connections will be trusted")
("realm", optValue(realm, "REALM"), "Use the given realm when
performing authentication")
- ("default-queue-limit", optValue(queueLimit, "BYTES"), "Default
maximum size for queues (in bytes)")
+ ("default-queue-limit", optValue(queueLimit, "BYTES"), "Default
maximum size for queues (in bytes)")
("tcp-nodelay", optValue(tcpNoDelay), "Set TCP_NODELAY on TCP
connections")
("require-encryption", optValue(requireEncrypted), "Only accept
connections that are encrypted")
("known-hosts-url", optValue(knownHosts, "URL or 'none'"), "URL to
send as 'known-hosts' to clients ('none' implies empty list)")
@@ -176,7 +176,7 @@
mgmtObject->set_dataDir(dataDir.getPath());
else
mgmtObject->clr_dataDir();
-
+
managementAgent->addObject(mgmtObject, 0x1000000000000002LL);
// Since there is currently no support for virtual hosts, a
placeholder object
@@ -218,12 +218,14 @@
// The cluster plug-in will setRecovery(false) on all but the first
// broker to join a cluster.
if (getRecovery()) {
- RecoveryManagerImpl recoverer(queues, exchanges, links,
dtxManager,
+ RecoveryManagerImpl recoverer(queues, exchanges, links, dtxManager,
conf.stagingThreshold);
store->recover(recoverer);
}
- else
- QPID_LOG(notice, "Recovering from cluster, no recovery from local
journal");
+ else {
+ QPID_LOG(notice, "Cluster recovery: recovered journal data
discarded and journal files pushed down");
+ store->discardInit(true);
+ }
}
//ensure standard exchanges exist (done after recovery from store)
@@ -266,11 +268,11 @@
//initialize known broker urls (TODO: add support for urls for other
transports (SSL, RDMA)):
if (conf.knownHosts.empty()) {
boost::shared_ptr<ProtocolFactory> factory =
getProtocolFactory(TCP_TRANSPORT);
- if (factory) {
+ if (factory) {
knownBrokers.push_back ( qpid::Url::getIpAddressesUrl (
factory->getPort() ) );
}
} else if (conf.knownHosts != knownHostsNone) {
- knownBrokers.push_back(Url(conf.knownHosts));
+ knownBrokers.push_back(Url(conf.knownHosts));
}
}
@@ -284,14 +286,14 @@
}
-boost::intrusive_ptr<Broker> Broker::create(int16_t port)
+boost::intrusive_ptr<Broker> Broker::create(int16_t port)
{
Options config;
config.port=port;
return create(config);
}
-boost::intrusive_ptr<Broker> Broker::create(const Options& opts)
+boost::intrusive_ptr<Broker> Broker::create(const Options& opts)
{
return boost::intrusive_ptr<Broker>(new Broker(opts));
}
@@ -398,7 +400,7 @@
}
boost::shared_ptr<ProtocolFactory> Broker::getProtocolFactory(const
std::string& name) const {
- ProtocolFactoryMap::const_iterator i
+ ProtocolFactoryMap::const_iterator i
= name.empty() ? protocolFactories.begin() :
protocolFactories.find(name);
if (i == protocolFactories.end()) return
boost::shared_ptr<ProtocolFactory>();
else return i->second;
@@ -406,7 +408,7 @@
uint16_t Broker::getPort(const std::string& name) const {
boost::shared_ptr<ProtocolFactory> factory = getProtocolFactory(name);
- if (factory) {
+ if (factory) {
return factory->getPort();
} else {
throw NoSuchTransportException(QPID_MSG("No such transport: '" << name
<< "'"));
@@ -443,8 +445,8 @@
connect(addr->host, addr->port, TCP_TRANSPORT, failed, f);
}
-uint32_t Broker::queueMoveMessages(
- const std::string& srcQueue,
+uint32_t Broker::queueMoveMessages(
+ const std::string& srcQueue,
const std::string& destQueue,
uint32_t qty)
{
@@ -461,7 +463,7 @@
boost::shared_ptr<sys::Poller> Broker::getPoller() { return poller; }
-std::vector<Url>
+std::vector<Url>
Broker::getKnownBrokersImpl()
{
return knownBrokers;
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h?rev=773004&r1=773003&r2=773004&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h Fri May 8 14:25:34 2009
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -46,14 +46,26 @@
public:
/**
- * init the store, call before any other call. If not called, store
+ * init the store, call before any other call. If not called, store
* is free to pick any defaults
- *
+ *
* @param options Options object provided by concrete store plug in.
*/
virtual bool init(const Options* options) = 0;
/**
+ * If called after init() but before recovery, will discard the database
+ * and reinitialize using an empty store dir. If the parameter
pushDownStoreFiles
+ * is true, the content of the store dir will be moved to a backup dir
inside the
+ * store dir. This is used when cluster nodes recover and must get thier
content
+ * from a cluster sync rather than directly fromt the store.
+ *
+ * @param pushDownStoreFiles If true, will move content of the store dir
into a
+ * subdir, leaving the store dir otherwise empty.
+ */
+ virtual void discardInit(const bool pushDownStoreFiles = false) = 0;
+
+ /**
* Record the existence of a durable queue
*/
virtual void create(PersistableQueue& queue,
@@ -62,7 +74,7 @@
* Destroy a durable queue
*/
virtual void destroy(PersistableQueue& queue) = 0;
-
+
/**
* Record the existence of a durable exchange
*/
@@ -72,17 +84,17 @@
* Destroy a durable exchange
*/
virtual void destroy(const PersistableExchange& exchange) = 0;
-
+
/**
* Record a binding
*/
- virtual void bind(const PersistableExchange& exchange, const
PersistableQueue& queue,
+ virtual void bind(const PersistableExchange& exchange, const
PersistableQueue& queue,
const std::string& key, const framing::FieldTable& args)
= 0;
/**
* Forget a binding
*/
- virtual void unbind(const PersistableExchange& exchange, const
PersistableQueue& queue,
+ virtual void unbind(const PersistableExchange& exchange, const
PersistableQueue& queue,
const std::string& key, const framing::FieldTable&
args) = 0;
/**
@@ -102,10 +114,10 @@
* point). If the message has not yet been stored it will
* store the headers as well as any content passed in. A
* persistence id will be set on the message which can be
- * used to load the content or to append to it.
+ * used to load the content or to append to it.
*/
virtual void stage(const boost::intrusive_ptr<PersistableMessage>& msg) =
0;
-
+
/**
* Destroys a previously staged message. This only needs
* to be called if the message is never enqueued. (Once
@@ -119,7 +131,7 @@
*/
virtual void appendContent(const boost::intrusive_ptr<const
PersistableMessage>& msg,
const std::string& data) = 0;
-
+
/**
* Loads (a section) of content data for the specified
* message (previously stored through a call to stage or
@@ -128,18 +140,18 @@
* content should be loaded, not the headers or related
* meta-data).
*/
- virtual void loadContent(const qpid::broker::PersistableQueue& queue,
+ virtual void loadContent(const qpid::broker::PersistableQueue& queue,
const boost::intrusive_ptr<const
PersistableMessage>& msg,
std::string& data, uint64_t offset, uint32_t
length) = 0;
-
+
/**
* Enqueues a message, storing the message if it has not
* been previously stored and recording that the given
- * message is on the given queue.
+ * message is on the given queue.
*
* Note: that this is async so the return of the function does
* not mean the opperation is complete.
- *
+ *
* @param msg the message to enqueue
* @param queue the name of the queue onto which it is to be enqueued
* @param xid (a pointer to) an identifier of the
@@ -149,7 +161,7 @@
virtual void enqueue(TransactionContext* ctxt,
const boost::intrusive_ptr<PersistableMessage>& msg,
const PersistableQueue& queue) = 0;
-
+
/**
* Dequeues a message, recording that the given message is
* no longer on the given queue and deleting the message
@@ -157,7 +169,7 @@
*
* Note: that this is async so the return of the function does
* not mean the opperation is complete.
- *
+ *
* @param msg the message to dequeue
* @param queue the name of the queue from which it is to be dequeued
* @param xid (a pointer to) an identifier of the
@@ -173,22 +185,22 @@
*
* Note: that this is async so the return of the function does
* not mean the opperation is complete.
- *
+ *
* @param queue the name of the queue from which it is to be dequeued
*/
virtual void flush(const qpid::broker::PersistableQueue& queue)=0;
/**
* Returns the number of outstanding AIO's for a given queue
- *
- * If 0, than all the enqueue / dequeues have been stored
+ *
+ * If 0, than all the enqueue / dequeues have been stored
* to disk
*
* @param queue the name of the queue to check for outstanding AIO
*/
virtual uint32_t outstandingQueueAIO(const PersistableQueue& queue) = 0;
-
+
virtual ~MessageStore(){}
};
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp?rev=773004&r1=773003&r2=773004&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp Fri May 8
14:25:34 2009
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -41,6 +41,11 @@
bool MessageStoreModule::init(const Options*) { return true; }
+void MessageStoreModule::discardInit(const bool pushDownStoreFiles)
+{
+ TRANSFER_EXCEPTION(store->discardInit(pushDownStoreFiles));
+}
+
void MessageStoreModule::create(PersistableQueue& queue, const FieldTable&
args)
{
TRANSFER_EXCEPTION(store->create(queue, args));
@@ -61,13 +66,13 @@
TRANSFER_EXCEPTION(store->destroy(exchange));
}
-void MessageStoreModule::bind(const PersistableExchange& e, const
PersistableQueue& q,
+void MessageStoreModule::bind(const PersistableExchange& e, const
PersistableQueue& q,
const std::string& k, const framing::FieldTable&
a)
{
TRANSFER_EXCEPTION(store->bind(e, q, k, a));
}
-void MessageStoreModule::unbind(const PersistableExchange& e, const
PersistableQueue& q,
+void MessageStoreModule::unbind(const PersistableExchange& e, const
PersistableQueue& q,
const std::string& k, const
framing::FieldTable& a)
{
TRANSFER_EXCEPTION(store->unbind(e, q, k, a));
@@ -105,7 +110,7 @@
}
void MessageStoreModule::loadContent(
- const qpid::broker::PersistableQueue& queue,
+ const qpid::broker::PersistableQueue& queue,
const intrusive_ptr<const PersistableMessage>& msg,
string& data, uint64_t offset, uint32_t length)
{
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h?rev=773004&r1=773003&r2=773004&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h Fri May 8
14:25:34 2009
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -40,6 +40,7 @@
MessageStoreModule(MessageStore* store);
bool init(const Options* options);
+ void discardInit(const bool pushDownStoreFiles = false);
std::auto_ptr<TransactionContext> begin();
std::auto_ptr<TPCTransactionContext> begin(const std::string& xid);
void prepare(TPCTransactionContext& txn);
@@ -51,9 +52,9 @@
void destroy(PersistableQueue& queue);
void create(const PersistableExchange& exchange, const
framing::FieldTable& args);
void destroy(const PersistableExchange& exchange);
- void bind(const PersistableExchange& exchange, const PersistableQueue&
queue,
+ void bind(const PersistableExchange& exchange, const PersistableQueue&
queue,
const std::string& key, const framing::FieldTable& args);
- void unbind(const PersistableExchange& exchange, const PersistableQueue&
queue,
+ void unbind(const PersistableExchange& exchange, const PersistableQueue&
queue,
const std::string& key, const framing::FieldTable& args);
void create(const PersistableConfig& config);
void destroy(const PersistableConfig& config);
@@ -61,7 +62,7 @@
void stage(const boost::intrusive_ptr<PersistableMessage>& msg);
void destroy(PersistableMessage& msg);
void appendContent(const boost::intrusive_ptr<const PersistableMessage>&
msg, const std::string& data);
- void loadContent(const qpid::broker::PersistableQueue& queue,
+ void loadContent(const qpid::broker::PersistableQueue& queue,
const boost::intrusive_ptr<const PersistableMessage>&
msg, std::string& data,
uint64_t offset, uint32_t length);
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp?rev=773004&r1=773003&r2=773004&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp Fri May 8
14:25:34 2009
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -36,12 +36,12 @@
class SimpleDummyCtxt : public TransactionContext {};
-class DummyCtxt : public TPCTransactionContext
+class DummyCtxt : public TPCTransactionContext
{
const std::string xid;
public:
DummyCtxt(const std::string& _xid) : xid(_xid) {}
- static std::string getXid(TransactionContext& ctxt)
+ static std::string getXid(TransactionContext& ctxt)
{
DummyCtxt* c(dynamic_cast<DummyCtxt*>(&ctxt));
return c ? c->xid : nullxid;
@@ -54,22 +54,21 @@
bool NullMessageStore::init(const Options* /*options*/) {return true;}
+void NullMessageStore::discardInit(const bool /*pushDownStoreFiles*/) {}
+
void NullMessageStore::create(PersistableQueue& queue, const
framing::FieldTable& /*args*/)
{
queue.setPersistenceId(nextPersistenceId++);
}
-void NullMessageStore::destroy(PersistableQueue&)
-{
-}
+void NullMessageStore::destroy(PersistableQueue&) {}
void NullMessageStore::create(const PersistableExchange& exchange, const
framing::FieldTable& /*args*/)
{
exchange.setPersistenceId(nextPersistenceId++);
}
-void NullMessageStore::destroy(const PersistableExchange& )
-{}
+void NullMessageStore::destroy(const PersistableExchange& ) {}
void NullMessageStore::bind(const PersistableExchange&, const
PersistableQueue&, const std::string&, const framing::FieldTable&){}
@@ -92,7 +91,7 @@
void NullMessageStore::loadContent(const qpid::broker::PersistableQueue&,
const intrusive_ptr<const
PersistableMessage>&,
- string&, uint64_t, uint32_t)
+ string&, uint64_t, uint32_t)
{
throw qpid::framing::InternalErrorException("Can't load content;
persistence not enabled");
}
@@ -101,7 +100,7 @@
const intrusive_ptr<PersistableMessage>& msg,
const PersistableQueue&)
{
- msg->enqueueComplete();
+ msg->enqueueComplete();
}
void NullMessageStore::dequeue(TransactionContext*,
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h?rev=773004&r1=773003&r2=773004&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h Fri May 8 14:25:34
2009
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -42,6 +42,7 @@
QPID_BROKER_EXTERN NullMessageStore();
QPID_BROKER_EXTERN virtual bool init(const Options* options);
+ QPID_BROKER_EXTERN virtual void discardInit(const bool pushDownStoreFiles
= false);
QPID_BROKER_EXTERN virtual std::auto_ptr<TransactionContext> begin();
QPID_BROKER_EXTERN virtual std::auto_ptr<TPCTransactionContext>
begin(const std::string& xid);
QPID_BROKER_EXTERN virtual void prepare(TPCTransactionContext& txn);
@@ -57,11 +58,11 @@
QPID_BROKER_EXTERN virtual void destroy(const PersistableExchange&
exchange);
QPID_BROKER_EXTERN virtual void bind(const PersistableExchange& exchange,
- const PersistableQueue& queue,
+ const PersistableQueue& queue,
const std::string& key,
const framing::FieldTable& args);
QPID_BROKER_EXTERN virtual void unbind(const PersistableExchange& exchange,
- const PersistableQueue& queue,
+ const PersistableQueue& queue,
const std::string& key,
const framing::FieldTable& args);
QPID_BROKER_EXTERN virtual void create(const PersistableConfig& config);
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]