Author: kpvdr
Date: Fri May  8 14:25:34 2009
New Revision: 773004

URL: http://svn.apache.org/viewvc?rev=773004&view=rev
Log:
Fixed cluster store problem where second and subsequent cluster nodes (which 
are persistent) to join a cluster fail with a "Exchange already exists: 
amq.direct (MessageStoreImpl.cpp:488)" message. To do this a new method was 
added to MessageStore called discardInit() which will throw away all restored 
data (if any) and restart as though no restore had taken place.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h
    qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h
    qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=773004&r1=773003&r2=773004&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Fri May  8 14:25:34 2009
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -114,11 +114,11 @@
         ("staging-threshold", optValue(stagingThreshold, "N"), "Stages 
messages over N bytes to disk")
         ("mgmt-enable,m", optValue(enableMgmt,"yes|no"), "Enable Management")
         ("mgmt-pub-interval", optValue(mgmtPubInterval, "SECONDS"), 
"Management Publish Interval")
-        ("queue-purge-interval", optValue(queueCleanInterval, "SECONDS"), 
+        ("queue-purge-interval", optValue(queueCleanInterval, "SECONDS"),
          "Interval between attempts to purge any expired messages from queues")
         ("auth", optValue(auth, "yes|no"), "Enable authentication, if disabled 
all incoming connections will be trusted")
         ("realm", optValue(realm, "REALM"), "Use the given realm when 
performing authentication")
-        ("default-queue-limit", optValue(queueLimit, "BYTES"), "Default 
maximum size for queues (in bytes)") 
+        ("default-queue-limit", optValue(queueLimit, "BYTES"), "Default 
maximum size for queues (in bytes)")
         ("tcp-nodelay", optValue(tcpNoDelay), "Set TCP_NODELAY on TCP 
connections")
         ("require-encryption", optValue(requireEncrypted), "Only accept 
connections that are encrypted")
         ("known-hosts-url", optValue(knownHosts, "URL or 'none'"), "URL to 
send as 'known-hosts' to clients ('none' implies empty list)")
@@ -176,7 +176,7 @@
             mgmtObject->set_dataDir(dataDir.getPath());
         else
             mgmtObject->clr_dataDir();
-        
+
         managementAgent->addObject(mgmtObject, 0x1000000000000002LL);
 
         // Since there is currently no support for virtual hosts, a 
placeholder object
@@ -218,12 +218,14 @@
         // The cluster plug-in will setRecovery(false) on all but the first
         // broker to join a cluster.
         if (getRecovery()) {
-            RecoveryManagerImpl recoverer(queues, exchanges, links, 
dtxManager, 
+            RecoveryManagerImpl recoverer(queues, exchanges, links, dtxManager,
                                           conf.stagingThreshold);
             store->recover(recoverer);
         }
-        else
-            QPID_LOG(notice, "Recovering from cluster, no recovery from local 
journal");
+        else {
+            QPID_LOG(notice, "Cluster recovery: recovered journal data 
discarded and journal files pushed down");
+            store->discardInit(true);
+        }
     }
 
     //ensure standard exchanges exist (done after recovery from store)
@@ -266,11 +268,11 @@
     //initialize known broker urls (TODO: add support for urls for other 
transports (SSL, RDMA)):
     if (conf.knownHosts.empty()) {
         boost::shared_ptr<ProtocolFactory> factory = 
getProtocolFactory(TCP_TRANSPORT);
-        if (factory) { 
+        if (factory) {
             knownBrokers.push_back ( qpid::Url::getIpAddressesUrl ( 
factory->getPort() ) );
         }
     } else if (conf.knownHosts != knownHostsNone) {
-        knownBrokers.push_back(Url(conf.knownHosts));        
+        knownBrokers.push_back(Url(conf.knownHosts));
     }
 }
 
@@ -284,14 +286,14 @@
 }
 
 
-boost::intrusive_ptr<Broker> Broker::create(int16_t port) 
+boost::intrusive_ptr<Broker> Broker::create(int16_t port)
 {
     Options config;
     config.port=port;
     return create(config);
 }
 
-boost::intrusive_ptr<Broker> Broker::create(const Options& opts) 
+boost::intrusive_ptr<Broker> Broker::create(const Options& opts)
 {
     return boost::intrusive_ptr<Broker>(new Broker(opts));
 }
@@ -398,7 +400,7 @@
 }
 
 boost::shared_ptr<ProtocolFactory> Broker::getProtocolFactory(const 
std::string& name) const {
-    ProtocolFactoryMap::const_iterator i 
+    ProtocolFactoryMap::const_iterator i
         = name.empty() ? protocolFactories.begin() : 
protocolFactories.find(name);
     if (i == protocolFactories.end()) return 
boost::shared_ptr<ProtocolFactory>();
     else return i->second;
@@ -406,7 +408,7 @@
 
 uint16_t Broker::getPort(const std::string& name) const  {
     boost::shared_ptr<ProtocolFactory> factory = getProtocolFactory(name);
-    if (factory) { 
+    if (factory) {
         return factory->getPort();
     } else {
         throw NoSuchTransportException(QPID_MSG("No such transport: '" << name 
<< "'"));
@@ -443,8 +445,8 @@
     connect(addr->host, addr->port, TCP_TRANSPORT, failed, f);
 }
 
-uint32_t Broker::queueMoveMessages( 
-     const std::string& srcQueue, 
+uint32_t Broker::queueMoveMessages(
+     const std::string& srcQueue,
      const std::string& destQueue,
      uint32_t  qty)
 {
@@ -461,7 +463,7 @@
 
 boost::shared_ptr<sys::Poller> Broker::getPoller() { return poller; }
 
-std::vector<Url> 
+std::vector<Url>
 Broker::getKnownBrokersImpl()
 {
     return knownBrokers;

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h?rev=773004&r1=773003&r2=773004&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h Fri May  8 14:25:34 2009
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -46,14 +46,26 @@
   public:
 
     /**
-     * init the store, call before any other call. If not called, store 
+     * init the store, call before any other call. If not called, store
      * is free to pick any defaults
-     * 
+     *
      * @param options Options object provided by concrete store plug in.
      */
     virtual bool init(const Options* options) = 0;
 
     /**
+     * If called after init() but before recovery, will discard the database
+     * and reinitialize using an empty store dir. If the parameter 
pushDownStoreFiles
+     * is true, the content of the store dir will be moved to a backup dir 
inside the
+     * store dir. This is used when cluster nodes recover and must get thier 
content
+     * from a cluster sync rather than directly fromt the store.
+     *
+     * @param pushDownStoreFiles If true, will move content of the store dir 
into a
+     *                           subdir, leaving the store dir otherwise empty.
+     */
+    virtual void discardInit(const bool pushDownStoreFiles = false) = 0;
+
+    /**
      * Record the existence of a durable queue
      */
     virtual void create(PersistableQueue& queue,
@@ -62,7 +74,7 @@
      * Destroy a durable queue
      */
     virtual void destroy(PersistableQueue& queue) = 0;
-    
+
     /**
      * Record the existence of a durable exchange
      */
@@ -72,17 +84,17 @@
      * Destroy a durable exchange
      */
     virtual void destroy(const PersistableExchange& exchange) = 0;
-    
+
     /**
      * Record a binding
      */
-    virtual void bind(const PersistableExchange& exchange, const 
PersistableQueue& queue, 
+    virtual void bind(const PersistableExchange& exchange, const 
PersistableQueue& queue,
                       const std::string& key, const framing::FieldTable& args) 
= 0;
 
     /**
      * Forget a binding
      */
-    virtual void unbind(const PersistableExchange& exchange, const 
PersistableQueue& queue, 
+    virtual void unbind(const PersistableExchange& exchange, const 
PersistableQueue& queue,
                         const std::string& key, const framing::FieldTable& 
args) = 0;
 
     /**
@@ -102,10 +114,10 @@
      * point). If the message has not yet been stored it will
      * store the headers as well as any content passed in. A
      * persistence id will be set on the message which can be
-     * used to load the content or to append to it. 
+     * used to load the content or to append to it.
      */
     virtual void stage(const boost::intrusive_ptr<PersistableMessage>& msg) = 
0;
-            
+
     /**
      * Destroys a previously staged message. This only needs
      * to be called if the message is never enqueued. (Once
@@ -119,7 +131,7 @@
      */
     virtual void appendContent(const boost::intrusive_ptr<const 
PersistableMessage>& msg,
                                const std::string& data) = 0;
-    
+
     /**
      * Loads (a section) of content data for the specified
      * message (previously stored through a call to stage or
@@ -128,18 +140,18 @@
      * content should be loaded, not the headers or related
      * meta-data).
      */
-    virtual void loadContent(const qpid::broker::PersistableQueue& queue, 
+    virtual void loadContent(const qpid::broker::PersistableQueue& queue,
                              const boost::intrusive_ptr<const 
PersistableMessage>& msg,
                              std::string& data, uint64_t offset, uint32_t 
length) = 0;
-    
+
     /**
      * Enqueues a message, storing the message if it has not
      * been previously stored and recording that the given
-     * message is on the given queue. 
+     * message is on the given queue.
      *
      * Note: that this is async so the return of the function does
      * not mean the opperation is complete.
-     * 
+     *
      * @param msg the message to enqueue
      * @param queue the name of the queue onto which it is to be enqueued
      * @param xid (a pointer to) an identifier of the
@@ -149,7 +161,7 @@
     virtual void enqueue(TransactionContext* ctxt,
                          const boost::intrusive_ptr<PersistableMessage>& msg,
                          const PersistableQueue& queue) = 0;
-    
+
     /**
      * Dequeues a message, recording that the given message is
      * no longer on the given queue and deleting the message
@@ -157,7 +169,7 @@
      *
      * Note: that this is async so the return of the function does
      * not mean the opperation is complete.
-     * 
+     *
      * @param msg the message to dequeue
      * @param queue the name of the queue from which it is to be dequeued
      * @param xid (a pointer to) an identifier of the
@@ -173,22 +185,22 @@
      *
      * Note: that this is async so the return of the function does
      * not mean the opperation is complete.
-     * 
+     *
      * @param queue the name of the queue from which it is to be dequeued
      */
     virtual void flush(const qpid::broker::PersistableQueue& queue)=0;
 
     /**
      * Returns the number of outstanding AIO's for a given queue
-     * 
-     * If 0, than all the enqueue / dequeues have been stored 
+     *
+     * If 0, than all the enqueue / dequeues have been stored
      * to disk
      *
      * @param queue the name of the queue to check for outstanding AIO
      */
     virtual uint32_t outstandingQueueAIO(const PersistableQueue& queue) = 0;
 
-    
+
     virtual ~MessageStore(){}
 };
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp?rev=773004&r1=773003&r2=773004&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp Fri May  8 
14:25:34 2009
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -41,6 +41,11 @@
 
 bool MessageStoreModule::init(const Options*) { return true; }
 
+void MessageStoreModule::discardInit(const bool pushDownStoreFiles)
+{
+    TRANSFER_EXCEPTION(store->discardInit(pushDownStoreFiles));
+}
+
 void MessageStoreModule::create(PersistableQueue& queue, const FieldTable& 
args)
 {
     TRANSFER_EXCEPTION(store->create(queue, args));
@@ -61,13 +66,13 @@
     TRANSFER_EXCEPTION(store->destroy(exchange));
 }
 
-void MessageStoreModule::bind(const PersistableExchange& e, const 
PersistableQueue& q, 
+void MessageStoreModule::bind(const PersistableExchange& e, const 
PersistableQueue& q,
                               const std::string& k, const framing::FieldTable& 
a)
 {
     TRANSFER_EXCEPTION(store->bind(e, q, k, a));
 }
 
-void MessageStoreModule::unbind(const PersistableExchange& e, const 
PersistableQueue& q, 
+void MessageStoreModule::unbind(const PersistableExchange& e, const 
PersistableQueue& q,
                                 const std::string& k, const 
framing::FieldTable& a)
 {
     TRANSFER_EXCEPTION(store->unbind(e, q, k, a));
@@ -105,7 +110,7 @@
 }
 
 void MessageStoreModule::loadContent(
-    const qpid::broker::PersistableQueue& queue, 
+    const qpid::broker::PersistableQueue& queue,
     const intrusive_ptr<const PersistableMessage>& msg,
     string& data, uint64_t offset, uint32_t length)
 {

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h?rev=773004&r1=773003&r2=773004&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h Fri May  8 
14:25:34 2009
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -40,6 +40,7 @@
     MessageStoreModule(MessageStore* store);
 
     bool init(const Options* options);
+    void discardInit(const bool pushDownStoreFiles = false);
     std::auto_ptr<TransactionContext> begin();
     std::auto_ptr<TPCTransactionContext> begin(const std::string& xid);
     void prepare(TPCTransactionContext& txn);
@@ -51,9 +52,9 @@
     void destroy(PersistableQueue& queue);
     void create(const PersistableExchange& exchange, const 
framing::FieldTable& args);
     void destroy(const PersistableExchange& exchange);
-    void bind(const PersistableExchange& exchange, const PersistableQueue& 
queue, 
+    void bind(const PersistableExchange& exchange, const PersistableQueue& 
queue,
               const std::string& key, const framing::FieldTable& args);
-    void unbind(const PersistableExchange& exchange, const PersistableQueue& 
queue, 
+    void unbind(const PersistableExchange& exchange, const PersistableQueue& 
queue,
                 const std::string& key, const framing::FieldTable& args);
     void create(const PersistableConfig& config);
     void destroy(const PersistableConfig& config);
@@ -61,7 +62,7 @@
     void stage(const boost::intrusive_ptr<PersistableMessage>& msg);
     void destroy(PersistableMessage& msg);
     void appendContent(const boost::intrusive_ptr<const PersistableMessage>& 
msg, const std::string& data);
-    void loadContent(const qpid::broker::PersistableQueue& queue, 
+    void loadContent(const qpid::broker::PersistableQueue& queue,
                      const boost::intrusive_ptr<const PersistableMessage>& 
msg, std::string& data,
                      uint64_t offset, uint32_t length);
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp?rev=773004&r1=773003&r2=773004&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp Fri May  8 
14:25:34 2009
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -36,12 +36,12 @@
 
 class SimpleDummyCtxt : public TransactionContext {};
 
-class DummyCtxt : public TPCTransactionContext 
+class DummyCtxt : public TPCTransactionContext
 {
     const std::string xid;
 public:
     DummyCtxt(const std::string& _xid) : xid(_xid) {}
-    static std::string getXid(TransactionContext& ctxt) 
+    static std::string getXid(TransactionContext& ctxt)
     {
         DummyCtxt* c(dynamic_cast<DummyCtxt*>(&ctxt));
         return c ? c->xid : nullxid;
@@ -54,22 +54,21 @@
 
 bool NullMessageStore::init(const Options* /*options*/) {return true;}
 
+void NullMessageStore::discardInit(const bool /*pushDownStoreFiles*/) {}
+
 void NullMessageStore::create(PersistableQueue& queue, const 
framing::FieldTable& /*args*/)
 {
     queue.setPersistenceId(nextPersistenceId++);
 }
 
-void NullMessageStore::destroy(PersistableQueue&)
-{
-}
+void NullMessageStore::destroy(PersistableQueue&) {}
 
 void NullMessageStore::create(const PersistableExchange& exchange, const 
framing::FieldTable& /*args*/)
 {
     exchange.setPersistenceId(nextPersistenceId++);
 }
 
-void NullMessageStore::destroy(const PersistableExchange& )
-{}
+void NullMessageStore::destroy(const PersistableExchange& ) {}
 
 void NullMessageStore::bind(const PersistableExchange&, const 
PersistableQueue&, const std::string&, const framing::FieldTable&){}
 
@@ -92,7 +91,7 @@
 
 void NullMessageStore::loadContent(const qpid::broker::PersistableQueue&,
                                    const intrusive_ptr<const 
PersistableMessage>&,
-                                   string&, uint64_t, uint32_t) 
+                                   string&, uint64_t, uint32_t)
 {
     throw qpid::framing::InternalErrorException("Can't load content; 
persistence not enabled");
 }
@@ -101,7 +100,7 @@
                                const intrusive_ptr<PersistableMessage>& msg,
                                const PersistableQueue&)
 {
-    msg->enqueueComplete(); 
+    msg->enqueueComplete();
 }
 
 void NullMessageStore::dequeue(TransactionContext*,

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h?rev=773004&r1=773003&r2=773004&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h Fri May  8 14:25:34 
2009
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -42,6 +42,7 @@
     QPID_BROKER_EXTERN NullMessageStore();
 
     QPID_BROKER_EXTERN virtual bool init(const Options* options);
+    QPID_BROKER_EXTERN virtual void discardInit(const bool pushDownStoreFiles 
= false);
     QPID_BROKER_EXTERN virtual std::auto_ptr<TransactionContext> begin();
     QPID_BROKER_EXTERN virtual std::auto_ptr<TPCTransactionContext> 
begin(const std::string& xid);
     QPID_BROKER_EXTERN virtual void prepare(TPCTransactionContext& txn);
@@ -57,11 +58,11 @@
     QPID_BROKER_EXTERN virtual void destroy(const PersistableExchange& 
exchange);
 
     QPID_BROKER_EXTERN virtual void bind(const PersistableExchange& exchange,
-                                         const PersistableQueue& queue, 
+                                         const PersistableQueue& queue,
                                          const std::string& key,
                                          const framing::FieldTable& args);
     QPID_BROKER_EXTERN virtual void unbind(const PersistableExchange& exchange,
-                                           const PersistableQueue& queue, 
+                                           const PersistableQueue& queue,
                                            const std::string& key,
                                            const framing::FieldTable& args);
     QPID_BROKER_EXTERN virtual void create(const PersistableConfig& config);



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to