Author: aconway
Date: Thu Jun 21 16:05:09 2012
New Revision: 1352588

URL: http://svn.apache.org/viewvc?rev=1352588&view=rev
Log:
QPID-4082: cluster de-sync after broker restart & queue replication

Having queue state replication between 2 clusters, restarting a broker in both
source+destination clusters sometimes leads to cluster de-sync. No QMF
communication is involved, though symptoms are similar to the bug caused by
missing propagation of QMF errors within a cluster.

The bug is caused by "deliveryCount" in SemanticState::ConsumerImpl
(qpid/broker/SemanticState.cpp) not being replicated to a joining cluster node
during catch-up. When the elder broker in src.cluster sends session.sync() after
sending 5 messages (per --ack 5 in qpid-route), the recently joiner node in
src.cluster does not do so, what leads to the cluster de-sync.

The patch:

 - adds to "consumer-state" method (see xml/cluster.xml file change) to update 
a new joi-ner a new property deliveryCount
 - updates cluster::Connection::consumerState to send deliveryCount to the 
method
 - updates cluster::Connection::consumerState to set the received deliveryCount
 - add two methods to broker::SemanticState::ConsumerImpl for getting and 
setting deliveryCount

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
    qpid/trunk/qpid/cpp/xml/cluster.xml

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h?rev=1352588&r1=1352587&r2=1352588&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h Thu Jun 21 16:05:09 2012
@@ -146,6 +146,8 @@ class SemanticState : private boost::non
         std::string getResumeId() const { return resumeId; };
         const std::string& getTag() const { return tag; }
         uint64_t getResumeTtl() const { return resumeTtl; }
+       uint32_t getDeliveryCount() const { return deliveryCount; }
+       void setDeliveryCount(uint32_t _deliveryCount) { deliveryCount = 
_deliveryCount; }
         const framing::FieldTable& getArguments() const { return arguments; }
 
         SemanticState& getParent() { return *parent; }

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=1352588&r1=1352587&r2=1352588&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Thu Jun 21 16:05:09 2012
@@ -406,11 +406,12 @@ void Connection::shadowSetUser(const std
 }
 
 void Connection::consumerState(const string& name, bool blocked, bool 
notifyEnabled, const SequenceNumber& position,
-                               uint32_t usedMsgCredit, uint32_t usedByteCredit)
+                               uint32_t usedMsgCredit, uint32_t 
usedByteCredit, const uint32_t deliveryCount)
 {
     broker::SemanticState::ConsumerImpl::shared_ptr c = 
semanticState().find(name);
     c->setPosition(position);
     c->setBlocked(blocked);
+    c->setDeliveryCount(deliveryCount);
     if (c->getCredit().isWindowMode()) c->getCredit().consume(usedMsgCredit, 
usedByteCredit);
     if (notifyEnabled) c->enableNotify(); else c->disableNotify();
     updateIn.consumerNumbering.add(c);

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=1352588&r1=1352587&r2=1352588&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Thu Jun 21 16:05:09 2012
@@ -110,7 +110,7 @@ class Connection :
     void deliveredFrame(const EventFrame&);
 
     void consumerState(const std::string& name, bool blocked, bool 
notifyEnabled, const qpid::framing::SequenceNumber& position,
-                       uint32_t usedMsgCredit, uint32_t usedByteCredit);
+                       uint32_t usedMsgCredit, uint32_t usedByteCredit, const 
uint32_t deliveryCount);
 
     // ==== Used in catch-up mode to build initial state.
     //

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=1352588&r1=1352587&r2=1352588&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Thu Jun 21 16:05:09 
2012
@@ -547,7 +547,8 @@ void UpdateClient::updateConsumer(
         ci->isNotifyEnabled(),
         ci->getPosition(),
         ci->getCredit().used().messages,
-        ci->getCredit().used().bytes
+        ci->getCredit().used().bytes,
+       ci->getDeliveryCount()
     );
     consumerNumbering.add(ci.get());
 

Modified: qpid/trunk/qpid/cpp/xml/cluster.xml
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=1352588&r1=1352587&r2=1352588&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ qpid/trunk/qpid/cpp/xml/cluster.xml Thu Jun 21 16:05:09 2012
@@ -179,6 +179,7 @@
       <field name="position" type="sequence-no"/>
       <field name="used-msg-credit" type="uint32"/>
       <field name="used-byte-credit" type="uint32"/>
+      <field name="deliveryCount" type="uint32"/>
     </control>
 
     <!-- Delivery-record for outgoing messages sent but not yet accepted. -->



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to