Author: kgiusti
Date: Thu Apr 28 12:25:59 2011
New Revision: 1097432

URL: http://svn.apache.org/viewvc?rev=1097432&view=rev
Log:
QPID-3076: enable flow control for clustered broker configurations.

Added:
    qpid/trunk/qpid/cpp/src/qpid/broker/StatefulQueueObserver.h
Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
    qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h
    qpid/trunk/qpid/cpp/src/tests/QueueFlowLimitTest.cpp
    qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
    qpid/trunk/qpid/cpp/src/tests/queue_flow_limit_tests.py
    qpid/trunk/qpid/cpp/xml/cluster.xml

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=1097432&r1=1097431&r2=1097432&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Thu Apr 28 12:25:59 2011
@@ -248,13 +248,7 @@ Broker::Broker(const Broker::Options& co
     // Early-Initialize plugins
     Plugin::earlyInitAll(*this);
 
-    /** todo KAG - remove once cluster support for flow control done */
-    if (isInCluster()) {
-        QPID_LOG(info, "Producer Flow Control TBD for clustered brokers - 
queue flow control disabled by default.");
-        QueueFlowLimit::setDefaults(0, 0, 0);
-    } else {
-        QueueFlowLimit::setDefaults(conf.queueLimit, conf.queueFlowStopRatio, 
conf.queueFlowResumeRatio);
-    }
+    QueueFlowLimit::setDefaults(conf.queueLimit, conf.queueFlowStopRatio, 
conf.queueFlowResumeRatio);
 
     // If no plugin store module registered itself, set up the null store.
     if (NullMessageStore::isNullStore(store.get()))

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=1097432&r1=1097431&r2=1097432&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Thu Apr 28 12:25:59 2011
@@ -348,6 +348,11 @@ class Queue : public boost::enable_share
         bindings.eachBinding(f);
     }
 
+    /** Apply f to each Observer on the queue */
+    template <class F> void eachObserver(F f) {
+        std::for_each<Observers::iterator, F>(observers.begin(), 
observers.end(), f);
+    }
+
     /** Set the position sequence number  for the next message on the queue.
      * Must be >= the current sequence number.
      * Used by cluster to replicate queues.

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp?rev=1097432&r1=1097431&r2=1097432&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp Thu Apr 28 12:25:59 
2011
@@ -92,7 +92,7 @@ namespace {
 QueueFlowLimit::QueueFlowLimit(Queue *_queue,
                                uint32_t _flowStopCount, uint32_t 
_flowResumeCount,
                                uint64_t _flowStopSize,  uint64_t 
_flowResumeSize)
-    : queue(_queue), queueName("<unknown>"),
+    : StatefulQueueObserver(std::string("QueueFlowLimit")), queue(_queue), 
queueName("<unknown>"),
       flowStopCount(_flowStopCount), flowResumeCount(_flowResumeCount),
       flowStopSize(_flowStopSize), flowResumeSize(_flowResumeSize),
       flowStopped(false), count(0), size(0), queueMgmtObj(0), broker(0)
@@ -123,8 +123,6 @@ QueueFlowLimit::QueueFlowLimit(Queue *_q
 
 void QueueFlowLimit::enqueued(const QueuedMessage& msg)
 {
-    if (!msg.payload) return;
-
     sys::Mutex::ScopedLock l(indexLock);
 
     ++count;
@@ -152,7 +150,9 @@ void QueueFlowLimit::enqueued(const Queu
         }
         QPID_LOG(trace, "Queue \"" << queueName << "\": setting flow control 
for msg pos=" << msg.position);
         msg.payload->getIngressCompletion().startCompleter();    // don't 
complete until flow resumes
-        index.insert(msg.payload);
+        bool unique;
+        unique = index.insert(std::pair<framing::SequenceNumber, 
boost::intrusive_ptr<Message> >(msg.position, msg.payload)).second;
+        assert(unique);
     }
 }
 
@@ -160,8 +160,6 @@ void QueueFlowLimit::enqueued(const Queu
 
 void QueueFlowLimit::dequeued(const QueuedMessage& msg)
 {
-    if (!msg.payload) return;
-
     sys::Mutex::ScopedLock l(indexLock);
 
     if (count > 0) {
@@ -189,16 +187,16 @@ void QueueFlowLimit::dequeued(const Queu
     if (!index.empty()) {
         if (!flowStopped) {
             // flow enabled - release all pending msgs
-            while (!index.empty()) {
-                std::set< boost::intrusive_ptr<Message> >::iterator itr = 
index.begin();
-                (*itr)->getIngressCompletion().finishCompleter();
-                index.erase(itr);
-            }
+            for (std::map<framing::SequenceNumber, 
boost::intrusive_ptr<Message> >::iterator itr = index.begin();
+                 itr != index.end(); ++itr)
+                if (itr->second)
+                    itr->second->getIngressCompletion().finishCompleter();
+            index.clear();
         } else {
             // even if flow controlled, we must release this msg as it is 
being dequeued
-            std::set< boost::intrusive_ptr<Message> >::iterator itr = 
index.find(msg.payload);
+            std::map<framing::SequenceNumber, boost::intrusive_ptr<Message> 
>::iterator itr = index.find(msg.position);
             if (itr != index.end()) {       // this msg is flow controlled, 
release it:
-                (*itr)->getIngressCompletion().finishCompleter();
+                msg.payload->getIngressCompletion().finishCompleter();
                 index.erase(itr);
             }
         }
@@ -206,34 +204,6 @@ void QueueFlowLimit::dequeued(const Queu
 }
 
 
-/** used by clustering: is the given message's completion blocked due to flow
- * control?  True if message is blocked. (for the clustering updater: done
- * after msgs have been replicated to the updatee).
- */
-bool QueueFlowLimit::getState(const QueuedMessage& msg) const
-{
-    sys::Mutex::ScopedLock l(indexLock);
-    return (index.find(msg.payload) != index.end());
-}
-
-
-/** artificially force the flow control state of a given message
- * (for the clustering updatee: done after msgs have been replicated to
- * the updatee's queue)
- */
-void QueueFlowLimit::setState(const QueuedMessage& msg, bool blocked)
-{
-    if (blocked && msg.payload) {
-
-        sys::Mutex::ScopedLock l(indexLock);
-        assert(index.find(msg.payload) == index.end());
-
-        QPID_LOG(debug, "Queue \"" << queue->getName() << "\": forcing flow 
control for msg pos=" << msg.position << " for CLUSTER SYNC");
-        index.insert(msg.payload);
-    }
-}
-
-
 void QueueFlowLimit::encode(Buffer& buffer) const
 {
   buffer.putLong(flowStopCount);
@@ -281,7 +251,7 @@ void QueueFlowLimit::setDefaults(uint64_
     defaultFlowStopRatio = flowStopRatio;
     defaultFlowResumeRatio = flowResumeRatio;
 
-    /** @todo Verify valid range on Broker::Options instead of here */
+    /** @todo KAG: Verify valid range on Broker::Options instead of here */
     if (flowStopRatio > 100 || flowResumeRatio > 100)
         throw InvalidArgumentException(QPID_MSG("Default queue flow ratios 
must be between 0 and 100, inclusive:"
                                                 << " flowStopRatio=" << 
flowStopRatio
@@ -320,14 +290,6 @@ QueueFlowLimit *QueueFlowLimit::createLi
         if (flowStopCount == 0 && flowStopSize == 0) {   // disable flow 
control
             return 0;
         }
-        /** @todo KAG - remove once cluster support for flow control done. */
-        // TODO aconway 2011-02-16: is queue==0 only in tests?
-        // TODO kgiusti 2011-02-19: yes!  The unit tests test this class in 
isolation */
-        if (queue && queue->getBroker() && queue->getBroker()->isInCluster()) {
-            QPID_LOG(warning, "Producer Flow Control TBD for clustered brokers 
- queue flow control disabled for queue "
-                     << queue->getName());
-            return 0;
-        }
         return new QueueFlowLimit(queue, flowStopCount, flowResumeCount, 
flowStopSize, flowResumeSize);
     }
 
@@ -335,17 +297,76 @@ QueueFlowLimit *QueueFlowLimit::createLi
         uint64_t maxByteCount = getCapacity(settings, QueuePolicy::maxSizeKey, 
defaultMaxSize);
         uint64_t flowStopSize = (uint64_t)(maxByteCount * 
(defaultFlowStopRatio/100.0) + 0.5);
         uint64_t flowResumeSize = (uint64_t)(maxByteCount * 
(defaultFlowResumeRatio/100.0));
+        return new QueueFlowLimit(queue, 0, 0, flowStopSize, flowResumeSize);
+    }
+    return 0;
+}
 
-        /** todo KAG - remove once cluster support for flow control done. */
-        if (queue && queue->getBroker() && queue->getBroker()->isInCluster()) {
-            QPID_LOG(warning, "Producer Flow Control TBD for clustered brokers 
- queue flow control disabled for queue "
-                     << queue->getName());
-            return 0;
+/* Cluster replication */
+
+namespace {
+    /** pack a set of sequence number ranges into a framing::Array */
+    void buildSeqRangeArray(qpid::framing::Array *seqs,
+                            const qpid::framing::SequenceNumber first,
+                            const qpid::framing::SequenceNumber last)
+    {
+        seqs->push_back(qpid::framing::Array::ValuePtr(new 
Unsigned32Value(first)));
+        seqs->push_back(qpid::framing::Array::ValuePtr(new 
Unsigned32Value(last)));
+    }
+}
+
+/** Runs on UPDATER to snapshot current state */
+void QueueFlowLimit::getState(qpid::framing::FieldTable& state ) const
+{
+    sys::Mutex::ScopedLock l(indexLock);
+    state.clear();
+
+    framing::SequenceSet ss;
+    if (!index.empty()) {
+        /* replicate the set of messages pending flow control */
+        for (std::map<framing::SequenceNumber, boost::intrusive_ptr<Message> 
>::const_iterator itr = index.begin();
+             itr != index.end(); ++itr) {
+            ss.add(itr->first);
         }
+        framing::Array seqs(TYPE_CODE_UINT32);
+        ss.for_each(boost::bind(&buildSeqRangeArray, &seqs, _1, _2));
+        state.setArray("pendingMsgSeqs", seqs);
+    }
+    QPID_LOG(debug, "Queue \"" << queueName << "\": flow limit replicating 
pending msgs, range=" << ss);
+}
 
-        return new QueueFlowLimit(queue, 0, 0, flowStopSize, flowResumeSize);
+
+/** called on UPDATEE to set state from snapshot */
+void QueueFlowLimit::setState(const qpid::framing::FieldTable& state)
+{
+    sys::Mutex::ScopedLock l(indexLock);
+    index.clear();
+
+    framing::SequenceSet fcmsg;
+    framing::Array seqArray(TYPE_CODE_UINT32);
+    if (state.getArray("pendingMsgSeqs", seqArray)) {
+        assert((seqArray.count() & 0x01) == 0); // must be even since they are 
sequence ranges
+        framing::Array::const_iterator i = seqArray.begin();
+        while (i != seqArray.end()) {
+            framing::SequenceNumber first((*i)->getIntegerValue<uint32_t, 
4>());
+            ++i;
+            framing::SequenceNumber last((*i)->getIntegerValue<uint32_t, 4>());
+            ++i;
+            fcmsg.add(first, last);
+            for (SequenceNumber seq = first; seq <= last; ++seq) {
+                QueuedMessage msg(queue->find(seq));   // fyi: msg.payload may 
be null if msg is delivered & unacked
+                bool unique;
+                unique = index.insert(std::pair<framing::SequenceNumber, 
boost::intrusive_ptr<Message> >(seq, msg.payload)).second;
+                assert(unique);
+            }
+        }
     }
-    return 0;
+
+    flowStopped = index.size() != 0;
+    if (queueMgmtObj) {
+        queueMgmtObj->set_flowStopped(isFlowControlActive());
+    }
+    QPID_LOG(debug, "Queue \"" << queueName << "\": flow limit replicated the 
pending msgs, range=" << fcmsg)
 }
 
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.h?rev=1097432&r1=1097431&r2=1097432&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.h Thu Apr 28 12:25:59 
2011
@@ -27,7 +27,7 @@
 #include <memory>
 #include "qpid/broker/BrokerImportExport.h"
 #include "qpid/broker/QueuedMessage.h"
-#include "qpid/broker/QueueObserver.h"
+#include "qpid/broker/StatefulQueueObserver.h"
 #include "qpid/framing/FieldTable.h"
 #include "qpid/sys/AtomicValue.h"
 #include "qpid/sys/Mutex.h"
@@ -53,7 +53,7 @@ class Broker;
  * passing _either_ level may turn flow control ON, but _both_ must be
  * below level before flow control will be turned OFF.
  */
- class QueueFlowLimit : public QueueObserver
+ class QueueFlowLimit : public StatefulQueueObserver
 {
     static uint64_t defaultMaxSize;
     static uint defaultFlowStopRatio;
@@ -86,9 +86,8 @@ class Broker;
     QPID_BROKER_EXTERN void dequeued(const QueuedMessage&);
 
     /** for clustering: */
-    /** true if the given message is flow controlled, and cannot be completed. 
*/
-    bool getState(const QueuedMessage&) const;
-    void setState(const QueuedMessage&, bool blocked);
+    QPID_BROKER_EXTERN void getState(qpid::framing::FieldTable&) const;
+    QPID_BROKER_EXTERN void setState(const qpid::framing::FieldTable&);
 
     uint32_t getFlowStopCount() const { return flowStopCount; }
     uint32_t getFlowResumeCount() const { return flowResumeCount; }
@@ -111,7 +110,7 @@ class Broker;
 
  protected:
     // msgs waiting for flow to become available.
-    std::set< boost::intrusive_ptr<Message> > index;
+    std::map<framing::SequenceNumber, boost::intrusive_ptr<Message> > index;
     mutable qpid::sys::Mutex indexLock;
 
     _qmfBroker::Queue *queueMgmtObj;

Added: qpid/trunk/qpid/cpp/src/qpid/broker/StatefulQueueObserver.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/StatefulQueueObserver.h?rev=1097432&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/StatefulQueueObserver.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/StatefulQueueObserver.h Thu Apr 28 
12:25:59 2011
@@ -0,0 +1,63 @@
+#ifndef QPID_BROKER_STATEFULQUEUEOBSERVER_H
+#define QPID_BROKER_STATEFULQUEUEOBSERVER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/QueueObserver.h"
+#include "qpid/framing/FieldTable.h"
+
+namespace qpid {
+namespace broker {
+
+/**
+ * Specialized type of QueueObserver that maintains internal state that has to
+ * be replicated across clustered brokers.
+ */
+class StatefulQueueObserver : public QueueObserver
+{
+  public:
+    StatefulQueueObserver(std::string _id) : id(_id) {}
+    virtual ~StatefulQueueObserver() {}
+
+    /** This identifier must uniquely identify this particular observer amoung
+     * all observers on a queue.  For cluster replication, this id will be used
+     * to identify the peer queue observer for synchronization across
+     * brokers.
+     */
+    const std::string& getId() const { return id; }
+
+    /** This method should return the observer's internal state as an opaque
+     * map.
+     */
+    virtual void getState(qpid::framing::FieldTable& state ) const = 0;
+
+    /** The input map represents the internal state of the peer observer that
+     * this observer should synchonize to.
+     */
+    virtual void setState(const qpid::framing::FieldTable&) = 0;
+
+
+  private:
+    std::string id;
+};
+}} // namespace qpid::broker
+
+#endif  /*!QPID_BROKER_STATEFULQUEUEOBSERVER_H*/

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=1097432&r1=1097431&r2=1097432&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Thu Apr 28 12:25:59 2011
@@ -198,7 +198,7 @@ namespace _qmf = ::qmf::org::apache::qpi
  * Currently use SVN revision to avoid clashes with versions from
  * different branches.
  */
-const uint32_t Cluster::CLUSTER_VERSION = 1058747;
+const uint32_t Cluster::CLUSTER_VERSION = 1097431;
 
 struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
     qpid::cluster::Cluster& cluster;

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=1097432&r1=1097431&r2=1097432&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Thu Apr 28 12:25:59 2011
@@ -35,6 +35,7 @@
 #include "qpid/broker/Fairshare.h"
 #include "qpid/broker/Link.h"
 #include "qpid/broker/Bridge.h"
+#include "qpid/broker/StatefulQueueObserver.h"
 #include "qpid/broker/Queue.h"
 #include "qpid/framing/enum.h"
 #include "qpid/framing/AMQFrame.h"
@@ -558,6 +559,48 @@ void Connection::queueFairshareState(con
     }
 }
 
+
+namespace {
+    // find a StatefulQueueObserver that matches a given identifier
+    class ObserverFinder {
+        const std::string id;
+        boost::shared_ptr<broker::QueueObserver> target;
+        ObserverFinder(const ObserverFinder&) {}
+    public:
+        ObserverFinder(const std::string& _id) : id(_id) {}
+        broker::StatefulQueueObserver *getObserver()
+        {
+            if (target)
+                return dynamic_cast<broker::StatefulQueueObserver 
*>(target.get());
+            return 0;
+        }
+        void operator() (boost::shared_ptr<broker::QueueObserver> o)
+        {
+            if (!target) {
+                broker::StatefulQueueObserver *p = 
dynamic_cast<broker::StatefulQueueObserver *>(o.get());
+                if (p && p->getId() == id) {
+                    target = o;
+                }
+            }
+        }
+    };
+}
+
+
+void Connection::queueObserverState(const std::string& qname, const 
std::string& observerId, const FieldTable& state)
+{
+    boost::shared_ptr<broker::Queue> queue(findQueue(qname));
+    ObserverFinder finder(observerId);      // find this observer
+    queue->eachObserver<ObserverFinder &>(finder);
+    broker::StatefulQueueObserver *so = finder.getObserver();
+    if (so) {
+        so->setState( state );
+        QPID_LOG(debug, "updated queue observer " << observerId << "'s state 
on queue " << qname << "; ...");
+        return;
+    }
+    QPID_LOG(error, "Failed to find observer " << observerId << " state on 
queue " << qname << "; this will result in inconsistencies.");
+}
+
 void Connection::expiryId(uint64_t id) {
     cluster.getExpiryPolicy().setId(id);
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=1097432&r1=1097431&r2=1097432&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Thu Apr 28 12:25:59 2011
@@ -153,6 +153,7 @@ class Connection :
 
     void queuePosition(const std::string&, const framing::SequenceNumber&);
     void queueFairshareState(const std::string&, const uint8_t priority, const 
uint8_t count);
+    void queueObserverState(const std::string&, const std::string&, const 
framing::FieldTable&);
     void expiryId(uint64_t);
 
     void txStart();

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=1097432&r1=1097431&r2=1097432&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Thu Apr 28 12:25:59 
2011
@@ -49,6 +49,7 @@
 #include "qpid/broker/TxPublish.h"
 #include "qpid/broker/RecoveredDequeue.h"
 #include "qpid/broker/RecoveredEnqueue.h"
+#include "qpid/broker/StatefulQueueObserver.h"
 #include "qpid/framing/MessageTransferBody.h"
 #include "qpid/framing/ClusterConnectionMembershipBody.h"
 #include "qpid/framing/ClusterConnectionShadowReadyBody.h"
@@ -167,6 +168,9 @@ void UpdateClient::update() {
                   boost::bind(&UpdateClient::updateConnection, this, _1));
     session.queueDelete(arg::queue=UPDATE);
 
+    // some Queue Observers need session state & msgs synced first, so sync 
observers now
+    b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueueObservers, 
this, _1));
+
     // Update queue listeners: must come after sessions so consumerNumbering 
is populated
     b.getQueues().eachQueue(boost::bind(&UpdateClient::updateQueueListeners, 
this, _1));
 
@@ -615,4 +619,23 @@ void UpdateClient::updateBridge(const bo
     ClusterConnectionProxy(session).config(encode(*bridge));
 }
 
+void UpdateClient::updateQueueObservers(const 
boost::shared_ptr<broker::Queue>& q)
+{
+    q->eachObserver(boost::bind(&UpdateClient::updateObserver, this, q, _1));
+}
+
+void UpdateClient::updateObserver(const boost::shared_ptr<broker::Queue>& q,
+                                        
boost::shared_ptr<broker::QueueObserver> o)
+{
+    qpid::framing::FieldTable state;
+    broker::StatefulQueueObserver *so = 
dynamic_cast<broker::StatefulQueueObserver *>(o.get());
+    if (so) {
+        so->getState( state );
+        std::string id(so->getId());
+        QPID_LOG(debug, *this << " updating queue " << q->getName() << "'s 
observer " << id);
+        ClusterConnectionProxy(session).queueObserverState( q->getName(), id, 
state );
+    }
+}
+
+
 }} // namespace qpid::cluster

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h?rev=1097432&r1=1097431&r2=1097432&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h Thu Apr 28 12:25:59 2011
@@ -51,6 +51,7 @@ class SemanticState;
 class Decoder;
 class Link;
 class Bridge;
+class QueueObserver;
 
 } // namespace broker
 
@@ -104,6 +105,8 @@ class UpdateClient : public sys::Runnabl
     void updateLinks();
     void updateLink(const boost::shared_ptr<broker::Link>&);
     void updateBridge(const boost::shared_ptr<broker::Bridge>&);
+    void updateQueueObservers(const boost::shared_ptr<broker::Queue>&);
+    void updateObserver(const boost::shared_ptr<broker::Queue>&, 
boost::shared_ptr<broker::QueueObserver>);
 
 
     Numbering<broker::SemanticState::ConsumerImpl*> consumerNumbering;

Modified: qpid/trunk/qpid/cpp/src/tests/QueueFlowLimitTest.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/QueueFlowLimitTest.cpp?rev=1097432&r1=1097431&r2=1097432&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/QueueFlowLimitTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/QueueFlowLimitTest.cpp Thu Apr 28 12:25:59 
2011
@@ -75,8 +75,10 @@ public:
 
 QueuedMessage createMessage(uint32_t size)
 {
+    static uint32_t seqNum;
     QueuedMessage msg;
     msg.payload = MessageUtils::createMessage();
+    msg.position = ++seqNum;
     MessageUtils::addContent(msg.payload, std::string (size, 'x'));
     return msg;
 }

Modified: qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_tests.py?rev=1097432&r1=1097431&r2=1097432&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_tests.py Thu Apr 28 12:25:59 2011
@@ -327,7 +327,7 @@ acl allow all all
             Thread.__init__(self)
         def run(self):
             try:
-                self.sender.send(self.msg)
+                self.sender.send(self.msg, sync=True)
                 self.condition.acquire()
                 try:
                     self.blocked = False
@@ -359,11 +359,12 @@ acl allow all all
         ssn0 = brokers.first().connect().session()
         s0 = ssn0.sender("flq; {create:always, node:{type:queue, 
x-declare:{arguments:{'qpid.flow_stop_count':5, 'qpid.flow_resume_count':3}}}}")
         brokers.first().startQmf()
-        q = [q for q in brokers.first().qmf_session.getObjects(_class="queue") 
if q.name == "flq"][0]
-        oid = q.getObjectId()
-        self.assertEqual(q.name, "flq")
-        self.assertEqual(q.arguments, {u'qpid.flow_stop_count': 5L, 
u'qpid.flow_resume_count': 3L})
-        assert not q.flowStopped
+        q1 = [q for q in 
brokers.first().qmf_session.getObjects(_class="queue") if q.name == "flq"][0]
+        oid = q1.getObjectId()
+        self.assertEqual(q1.name, "flq")
+        self.assertEqual(q1.arguments, {u'qpid.flow_stop_count': 5L, 
u'qpid.flow_resume_count': 3L})
+        assert not q1.flowStopped
+        self.assertEqual(q1.flowStoppedCount, 0)
 
         # fill the queue on one broker until flow control is active
         for x in range(5): s0.send(Message(str(x)))
@@ -371,18 +372,20 @@ acl allow all all
         sender.start()                  # Tests that sender does block
         # Verify the broker queue goes into a flowStopped state
         deadline = time.time() + 1
-        while not q.flowStopped and time.time() < deadline: q.update()
-        assert q.flowStopped
+        while not q1.flowStopped and time.time() < deadline: q1.update()
+        assert q1.flowStopped
+        self.assertEqual(q1.flowStoppedCount, 1)
         sender.assert_blocked()         # Still blocked
 
         # Now verify the  both brokers in cluster have same configuration
         brokers.second().startQmf()
         qs = brokers.second().qmf_session.getObjects(_objectId=oid)
         self.assertEqual(len(qs), 1)
-        q = qs[0]
-        self.assertEqual(q.name, "flq")
-        self.assertEqual(q.arguments, {u'qpid.flow_stop_count': 5L, 
u'qpid.flow_resume_count': 3L})
-        assert q.flowStopped
+        q2 = qs[0]
+        self.assertEqual(q2.name, "flq")
+        self.assertEqual(q2.arguments, {u'qpid.flow_stop_count': 5L, 
u'qpid.flow_resume_count': 3L})
+        assert q2.flowStopped
+        self.assertEqual(q2.flowStoppedCount, 1)
 
         # now drain the queue using a session to the other broker
         ssn1 = brokers.second().connect().session()
@@ -392,6 +395,12 @@ acl allow all all
             ssn1.acknowledge()
         sender.wait()                   # Verify no longer blocked.
 
+        # and re-verify state of queue on both brokers
+        q1.update()
+        assert not q1.flowStopped
+        q2.update()
+        assert not q2.flowStopped
+
         ssn0.connection.close()
         ssn1.connection.close()
         cluster_test_logs.verify_logs()
@@ -405,7 +414,6 @@ acl allow all all
         self.queue_flowlimit_test(Brokers())
 
     def test_queue_flowlimit_cluster(self):
-        return          # TODO aconway 2011-02-18: disabled till fixed, 
QPID-2935
         cluster = self.cluster(2)
         class Brokers:
             def first(self): return cluster[0]
@@ -413,7 +421,6 @@ acl allow all all
         self.queue_flowlimit_test(Brokers())
 
     def test_queue_flowlimit_cluster_join(self):
-        return          # TODO aconway 2011-02-18: disabled till fixed, 
QPID-2935
         cluster = self.cluster(1)
         class Brokers:
             def first(self): return cluster[0]
@@ -422,6 +429,103 @@ acl allow all all
                 return cluster[1]
         self.queue_flowlimit_test(Brokers())
 
+    def test_queue_flowlimit_replicate(self):
+        """ Verify that a queue which is in flow control BUT has drained BELOW
+        the flow control 'stop' threshold, is correctly replicated when a new
+        broker is added to the cluster.
+        """
+
+        class AsyncSender(Thread):
+            """Send a fixed number of msgs from a sender in a separate thread
+            so it may block without blocking the test.
+            """
+            def __init__(self, broker, address, count=1, size=4):
+                Thread.__init__(self)
+                self.daemon = True
+                self.broker = broker
+                self.queue = address
+                self.count = count
+                self.size = size
+                self.done = False
+
+            def run(self):
+                self.sender = subprocess.Popen(["qpid-send",
+                                                "--capacity=1",
+                                                "--content-size=%s" % 
self.size,
+                                                "--messages=%s" % self.count,
+                                                "--failover-updates",
+                                                
"--connection-options={reconnect:true}",
+                                                "--address=%s" % self.queue,
+                                                "--broker=%s" % 
self.broker.host_port()])
+                self.sender.wait()
+                self.done = True
+
+        cluster = self.cluster(2)
+        # create a queue with rather draconian flow control settings
+        ssn0 = cluster[0].connect().session()
+        s0 = ssn0.sender("flq; {create:always, node:{type:queue, 
x-declare:{arguments:{'qpid.flow_stop_count':100, 
'qpid.flow_resume_count':20}}}}")
+
+        # fire off the sending thread to broker[0], and wait until the queue
+        # hits flow control on broker[1]
+        sender = AsyncSender(cluster[0], "flq", count=110);
+        sender.start();
+
+        cluster[1].startQmf()
+        q_obj = [q for q in cluster[1].qmf_session.getObjects(_class="queue") 
if q.name == "flq"][0]
+        deadline = time.time() + 10
+        while not q_obj.flowStopped and time.time() < deadline:
+            q_obj.update()
+        assert q_obj.flowStopped
+        assert not sender.done
+        assert q_obj.msgDepth < 110
+
+        # Now drain enough messages on broker[1] to drop below the flow stop
+        # threshold, but not relieve flow control...
+        receiver = subprocess.Popen(["qpid-receive",
+                                     "--messages=15",
+                                     "--timeout=1",
+                                     "--print-content=no",
+                                     "--failover-updates",
+                                     "--connection-options={reconnect:true}",
+                                     "--ack-frequency=1",
+                                     "--address=flq",
+                                     "--broker=%s" % cluster[1].host_port()])
+        receiver.wait()
+        q_obj.update()
+        assert q_obj.flowStopped
+        assert not sender.done
+        current_depth = q_obj.msgDepth
+
+        # add a new broker to the cluster, and verify that the queue is in flow
+        # control on that broker
+        cluster.start()
+        cluster[2].startQmf()
+        q_obj = [q for q in cluster[2].qmf_session.getObjects(_class="queue") 
if q.name == "flq"][0]
+        assert q_obj.flowStopped
+        assert q_obj.msgDepth == current_depth
+
+        # now drain the queue on broker[2], and verify that the sender becomes
+        # unblocked
+        receiver = subprocess.Popen(["qpid-receive",
+                                     "--messages=95",
+                                     "--timeout=1",
+                                     "--print-content=no",
+                                     "--failover-updates",
+                                     "--connection-options={reconnect:true}",
+                                     "--ack-frequency=1",
+                                     "--address=flq",
+                                     "--broker=%s" % cluster[2].host_port()])
+        receiver.wait()
+        q_obj.update()
+        assert not q_obj.flowStopped
+        assert q_obj.msgDepth == 0
+
+        # verify that the sender has become unblocked
+        sender.join(timeout=5)
+        assert not sender.isAlive()
+        assert sender.done
+
+
     def test_alternate_exchange_update(self):
         """Verify that alternate-exchange on exchanges and queues is 
propagated to new members of a cluster. """
         cluster = self.cluster(1)
@@ -688,6 +792,41 @@ class LongTests(BrokerTest):
             for i in xrange(1000): cluster[0].connect().close()
             cluster_test_logs.verify_logs()
 
+    def test_flowlimit_failover(self):
+        """Test fail-over during continuous send-receive with flow control
+        active.
+        """
+
+        # Original cluster will all be killed so expect exit with failure
+        cluster = self.cluster(3, expect=EXPECT_EXIT_FAIL)
+        #for b in cluster: ErrorGenerator(b)
+
+        # create a queue with rather draconian flow control settings
+        ssn0 = cluster[0].connect().session()
+        s0 = ssn0.sender("test-queue; {create:always, node:{type:queue, 
x-declare:{arguments:{'qpid.flow_stop_count':2000, 
'qpid.flow_resume_count':100}}}}")
+
+        receiver = NumberedReceiver(cluster[2])
+        receiver.start()
+        senders = [NumberedSender(cluster[i]) for i in range(1,3)]
+        for s in senders:
+            s.start()
+
+        # Kill original brokers, start new ones for the duration.
+        endtime = time.time() + self.duration();
+        i = 0
+        while time.time() < endtime:
+            cluster[i].kill()
+            i += 1
+            b = cluster.start(expect=EXPECT_EXIT_FAIL)
+            #ErrorGenerator(b)
+            time.sleep(5)
+            #b = cluster[0]
+            #b.startQmf()
+        for s in senders:
+            s.stop()
+        receiver.stop()
+        for i in range(i, len(cluster)): cluster[i].kill()
+
 
 class StoreTests(BrokerTest):
     """

Modified: qpid/trunk/qpid/cpp/src/tests/queue_flow_limit_tests.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/queue_flow_limit_tests.py?rev=1097432&r1=1097431&r2=1097432&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/queue_flow_limit_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/queue_flow_limit_tests.py Thu Apr 28 12:25:59 
2011
@@ -24,7 +24,7 @@ from qpid import datatypes, messaging
 from qpid.messaging import Message, Empty
 from threading import Thread, Lock
 from logging import getLogger
-from time import sleep
+from time import sleep, time
 from os import environ, popen
 
 class QueueFlowLimitTests(TestBase010):
@@ -145,11 +145,10 @@ class QueueFlowLimitTests(TestBase010):
         totalMsgs = 1213 + 797 + 331
 
         # wait until flow control is active
-        count = 0
+        deadline = time() + 10
         while self.qmf.getObjects(_objectId=oid)[0].flowStopped == False and \
-                count < 10:
-            sleep(1);
-            count += 1;
+                time() < deadline:
+            pass
         self.failUnless(self.qmf.getObjects(_objectId=oid)[0].flowStopped)
         depth = self.qmf.getObjects(_objectId=oid)[0].msgDepth
         self.assertGreater(depth, 373)
@@ -200,11 +199,10 @@ class QueueFlowLimitTests(TestBase010):
         totalBytes = 439 + 631 + 823
 
         # wait until flow control is active
-        count = 0
+        deadline = time() + 10
         while self.qmf.getObjects(_objectId=oid)[0].flowStopped == False and \
-                count < 10:
-            sleep(1);
-            count += 1;
+                time() < deadline:
+            pass
         self.failUnless(self.qmf.getObjects(_objectId=oid)[0].flowStopped)
         self.assertGreater(self.qmf.getObjects(_objectId=oid)[0].byteDepth, 
351133)
 

Modified: qpid/trunk/qpid/cpp/xml/cluster.xml
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=1097432&r1=1097431&r2=1097432&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ qpid/trunk/qpid/cpp/xml/cluster.xml Thu Apr 28 12:25:59 2011
@@ -281,6 +281,14 @@
       <field name="position" type="uint8"/>
       <field name="count" type="uint8"/>
     </control>
+
+    <!-- Replicate a QueueObserver for a given queue. -->
+    <control name="queue-observer-state" code="0x39">
+      <field name="queue" type="str8"/>
+      <field name="observer-id" type="str8"/>
+      <field name="state" type="map"/>    <!-- "name"=value -->
+    </control>
+
   </class>
 
 </amqp>



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to