Author: aconway
Date: Mon Apr 7 15:39:19 2014
New Revision: 1585507
URL: http://svn.apache.org/r1585507
Log:
QPID-5666: HA fails with resource-limit-exceeded: Exceeded replicated queue
limit
This is regression introduced in r1561206: CommitDate: Fri Jan 24 21:54:59 2014
+0000
QPID-5513: HA backup fails if number of replicated queues exceeds number of
channels.
Fixed by the current commit. PrimaryQueueLimits was not taking account of
queues already
on the broker prior to promotion.
Modified:
qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp
qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryQueueLimits.h
qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
qpid/trunk/qpid/cpp/src/qpid/ha/ReplicationTest.cpp
qpid/trunk/qpid/cpp/src/qpid/ha/ReplicationTest.h
qpid/trunk/qpid/cpp/src/tests/ha_test.py
qpid/trunk/qpid/cpp/src/tests/ha_tests.py
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp?rev=1585507&r1=1585506&r2=1585507&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp Mon Apr 7 15:39:19 2014
@@ -136,7 +136,7 @@ Primary::Primary(HaBroker& hb, const Bro
logPrefix("Primary: "), active(false),
replicationTest(hb.getSettings().replicateDefault.get()),
sessionHandlerObserver(new PrimarySessionHandlerObserver(logPrefix)),
- queueLimits(logPrefix)
+ queueLimits(logPrefix, hb.getBroker().getQueues(), replicationTest)
{
// Note that at this point, we are still rejecting client connections.
// So we are safe from client interference while we set up the primary.
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryQueueLimits.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryQueueLimits.h?rev=1585507&r1=1585506&r2=1585507&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryQueueLimits.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryQueueLimits.h Mon Apr 7 15:39:19
2014
@@ -22,9 +22,12 @@
*
*/
+#include "ReplicationTest.h"
#include <qpid/broker/Queue.h>
+#include <qpid/broker/QueueRegistry.h>
#include <qpid/framing/amqp_types.h>
#include <boost/shared_ptr.hpp>
+#include <boost/bind.hpp>
#include <string>
namespace qpid {
@@ -45,8 +48,15 @@ class PrimaryQueueLimits
{
public:
// FIXME aconway 2014-01-24: hardcoded maxQueues, use negotiated
channel-max
- PrimaryQueueLimits(const std::string& lp) :
- logPrefix(lp), maxQueues(framing::CHANNEL_MAX-100), queues(0) {}
+ PrimaryQueueLimits(const std::string& lp,
+ broker::QueueRegistry& qr,
+ const ReplicationTest& rt
+ ) :
+ logPrefix(lp), maxQueues(framing::CHANNEL_MAX-100), queues(0)
+ {
+ // Get initial count of replicated queues
+ qr.eachQueue(boost::bind(&PrimaryQueueLimits::addQueueIfReplicated,
this, _1, rt));
+ }
/** Add a replicated queue
*@exception ResourceLimitExceededException if this would exceed the limit.
@@ -57,15 +67,22 @@ class PrimaryQueueLimits
<< " exceeds limit of " << maxQueues
<< " replicated queues.");
throw framing::ResourceLimitExceededException(
- "Exceeded replicated queue limit.");
+ Msg() << "Exceeded replicated queue limit " << queues << " >=
" << maxQueues);
}
else ++queues;
}
+ void addQueueIfReplicated(const boost::shared_ptr<broker::Queue>& q, const
ReplicationTest& rt) {
+ if(rt.useLevel(*q)) addQueue(q);
+ }
+
/** Remove a replicated queue.
* @pre Was previously added with addQueue
*/
- void removeQueue(const boost::shared_ptr<broker::Queue>&) { --queues; }
+ void removeQueue(const boost::shared_ptr<broker::Queue>&) {
+ assert(queues != 0);
+ --queues;
+ }
// TODO aconway 2014-01-24: Currently replication links always use the
// hard-coded framing::CHANNEL_MAX. In future (e.g. when we support AMQP1.0
@@ -83,7 +100,7 @@ class PrimaryQueueLimits
std::string logPrefix;
uint64_t maxQueues;
uint64_t queues;
-};
+};
}} // namespace qpid::ha
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp?rev=1585507&r1=1585506&r2=1585507&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp Mon Apr 7 15:39:19 2014
@@ -41,7 +41,7 @@ RemoteBackup::RemoteBackup(
std::ostringstream oss;
oss << "Remote backup at " << info << ": ";
logPrefix = oss.str();
- QPID_LOG(debug, logPrefix << "Connected");
+ QPID_LOG(debug, logPrefix << (c? "Connected" : "Expected"));
}
RemoteBackup::~RemoteBackup() {
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ReplicationTest.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ReplicationTest.cpp?rev=1585507&r1=1585506&r2=1585507&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/ReplicationTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/ReplicationTest.cpp Mon Apr 7 15:39:19 2014
@@ -29,20 +29,20 @@ namespace ha {
using types::Variant;
-ReplicateLevel ReplicationTest::getLevel(const std::string& str) {
+ReplicateLevel ReplicationTest::getLevel(const std::string& str) const {
Enum<ReplicateLevel> rl(replicateDefault);
if (!str.empty()) rl.parse(str);
return rl.get();
}
-ReplicateLevel ReplicationTest::getLevel(const framing::FieldTable& f) {
+ReplicateLevel ReplicationTest::getLevel(const framing::FieldTable& f) const {
if (f.isSet(QPID_REPLICATE))
return getLevel(f.getAsString(QPID_REPLICATE));
else
return replicateDefault;
}
-ReplicateLevel ReplicationTest::getLevel(const Variant::Map& m) {
+ReplicateLevel ReplicationTest::getLevel(const Variant::Map& m) const {
Variant::Map::const_iterator i = m.find(QPID_REPLICATE);
if (i != m.end())
return getLevel(i->second.asString());
@@ -50,7 +50,7 @@ ReplicateLevel ReplicationTest::getLevel
return replicateDefault;
}
-ReplicateLevel ReplicationTest::getLevel(const broker::Queue& q) {
+ReplicateLevel ReplicationTest::getLevel(const broker::Queue& q) const {
const Variant::Map& qmap(q.getSettings().original);
Variant::Map::const_iterator i = qmap.find(QPID_REPLICATE);
if (i != qmap.end())
@@ -59,16 +59,15 @@ ReplicateLevel ReplicationTest::getLevel
return getLevel(q.getSettings().storeSettings);
}
-ReplicateLevel ReplicationTest::getLevel(const broker::Exchange& ex) {
+ReplicateLevel ReplicationTest::getLevel(const broker::Exchange& ex) const {
return getLevel(ex.getArgs());
}
-ReplicateLevel ReplicationTest::useLevel(const broker::Queue& q)
-{
+ReplicateLevel ReplicationTest::useLevel(const broker::Queue& q) const {
return q.getSettings().isTemporary ? ReplicationTest(NONE).getLevel(q) :
getLevel(q);
}
-ReplicateLevel ReplicationTest::useLevel(const broker::Exchange& ex) {
+ReplicateLevel ReplicationTest::useLevel(const broker::Exchange& ex) const {
return ReplicationTest::getLevel(ex);
}
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ReplicationTest.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ReplicationTest.h?rev=1585507&r1=1585506&r2=1585507&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/ReplicationTest.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/ReplicationTest.h Mon Apr 7 15:39:19 2014
@@ -56,18 +56,18 @@ class ReplicationTest
replicateDefault(replicateDefault_) {}
// Get the replication level set on an object, or default if not set.
- ReplicateLevel getLevel(const std::string& str);
- ReplicateLevel getLevel(const framing::FieldTable& f);
- ReplicateLevel getLevel(const types::Variant::Map& m);
- ReplicateLevel getLevel(const broker::Queue&);
- ReplicateLevel getLevel(const broker::Exchange&);
+ ReplicateLevel getLevel(const std::string& str) const;
+ ReplicateLevel getLevel(const framing::FieldTable& f) const;
+ ReplicateLevel getLevel(const types::Variant::Map& m) const;
+ ReplicateLevel getLevel(const broker::Queue&) const;
+ ReplicateLevel getLevel(const broker::Exchange&) const;
// Calculate level for objects that may not have replication set,
// including auto-delete/exclusive settings.
- ReplicateLevel useLevel(const types::Variant::Map& args, bool autodelete,
bool exclusive);
- ReplicateLevel useLevel(const framing::FieldTable& args, bool autodelete,
bool exclusive);
- ReplicateLevel useLevel(const broker::Queue&);
- ReplicateLevel useLevel(const broker::Exchange&);
+ ReplicateLevel useLevel(const types::Variant::Map& args, bool autodelete,
bool exclusive) const;
+ ReplicateLevel useLevel(const framing::FieldTable& args, bool autodelete,
bool exclusive) const;
+ ReplicateLevel useLevel(const broker::Queue&) const;
+ ReplicateLevel useLevel(const broker::Exchange&) const;
private:
ReplicateLevel replicateDefault;
Modified: qpid/trunk/qpid/cpp/src/tests/ha_test.py
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_test.py?rev=1585507&r1=1585506&r2=1585507&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_test.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_test.py Mon Apr 7 15:39:19 2014
@@ -196,7 +196,7 @@ acl allow all all
def ha_status(self): return self.qmf().status
- def wait_status(self, status):
+ def wait_status(self, status, timeout=5):
def try_get_status():
self._status = "<unknown>"
# Ignore ConnectionError, the broker may not be up yet.
@@ -204,7 +204,7 @@ acl allow all all
self._status = self.ha_status()
return self._status == status;
except ConnectionError: return False
- assert retry(try_get_status, timeout=5), "%s expected=%r, actual=%r"%(
+ assert retry(try_get_status, timeout=timeout), "%s expected=%r,
actual=%r"%(
self, status, self._status)
def wait_queue(self, queue, timeout=1):
Modified: qpid/trunk/qpid/cpp/src/tests/ha_tests.py
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_tests.py?rev=1585507&r1=1585506&r2=1585507&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_tests.py Mon Apr 7 15:39:19 2014
@@ -885,6 +885,19 @@ acl deny all all
old_sess.exchange_declare(exchange='ex1', type='fanout')
cluster[1].wait_backup("ex1")
+ def test_resource_limit_bug(self):
+ """QPID-5666 Regression test: Incorrect resource limit exception for
queue creation."""
+ cluster = HaCluster(self, 3)
+ qs = ["q%s"%i for i in xrange(10)]
+ s = cluster[0].connect().session()
+ s.sender("q;{create:always}").close()
+ cluster.kill(0)
+ cluster[1].promote()
+ cluster[1].wait_status("active")
+ s = cluster[1].connect().session()
+ s.receiver("q;{delete:always}").close()
+ s.sender("qq;{create:always}").close()
+
def fairshare(msgs, limit, levels):
"""
Generator to return prioritised messages in expected order for a given
fairshare limit
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]