Author: aconway
Date: Mon Apr  2 22:15:31 2012
New Revision: 1308597

URL: http://svn.apache.org/viewvc?rev=1308597&view=rev
Log:
QPID-3603: Broker option --ha-replicate-default to specify default replication.

Takes values 'all', 'configuration', 'all'. This is the replication level to use
if a queue or exchange is created without an explicit 'qpid.replicate' argument.

Added:
    qpid/trunk/qpid/cpp/src/qpid/ha/ReplicateLevel.cpp   (with props)
    qpid/trunk/qpid/cpp/src/qpid/ha/ReplicateLevel.h
      - copied, changed from r1307592, 
qpid/trunk/qpid/cpp/src/qpid/ha/Settings.h
Modified:
    qpid/trunk/qpid/cpp/src/ha.mk
    qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/Backup.h
    qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h
    qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h
    qpid/trunk/qpid/cpp/src/qpid/ha/HaPlugin.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/Settings.h
    qpid/trunk/qpid/cpp/src/qpid/ha/management-schema.xml
    qpid/trunk/qpid/cpp/src/tests/ha_tests.py

Modified: qpid/trunk/qpid/cpp/src/ha.mk
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/ha.mk?rev=1308597&r1=1308596&r2=1308597&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/ha.mk (original)
+++ qpid/trunk/qpid/cpp/src/ha.mk Mon Apr  2 22:15:31 2012
@@ -25,18 +25,20 @@ dmoduleexec_LTLIBRARIES += ha.la
 ha_la_SOURCES =                                        \
   qpid/ha/Backup.cpp                           \
   qpid/ha/Backup.h                             \
+  qpid/ha/BrokerReplicator.cpp                 \
+  qpid/ha/BrokerReplicator.h                    \
+  qpid/ha/ConnectionExcluder.cpp               \
+  qpid/ha/ConnectionExcluder.h                 \
   qpid/ha/HaBroker.cpp                         \
   qpid/ha/HaBroker.h                           \
   qpid/ha/HaPlugin.cpp                         \
-  qpid/ha/Settings.h                           \
-  qpid/ha/QueueReplicator.h                    \
   qpid/ha/QueueReplicator.cpp                  \
-  qpid/ha/ReplicatingSubscription.h            \
+  qpid/ha/QueueReplicator.h                    \
+  qpid/ha/ReplicateLevel.cpp                   \
+  qpid/ha/ReplicateLevel.h                     \
   qpid/ha/ReplicatingSubscription.cpp          \
-  qpid/ha/BrokerReplicator.cpp                 \
-  qpid/ha/BrokerReplicator.h                    \
-  qpid/ha/ConnectionExcluder.cpp               \
-  qpid/ha/ConnectionExcluder.h
+  qpid/ha/ReplicatingSubscription.h            \
+  qpid/ha/Settings.h
 
 ha_la_LIBADD = libqpidbroker.la
 ha_la_LDFLAGS = $(PLUGINLDFLAGS)

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp?rev=1308597&r1=1308596&r2=1308597&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Backup.cpp Mon Apr  2 22:15:31 2012
@@ -19,10 +19,11 @@
  *
  */
 #include "Backup.h"
-#include "Settings.h"
 #include "BrokerReplicator.h"
-#include "ReplicatingSubscription.h"
 #include "ConnectionExcluder.h"
+#include "HaBroker.h"
+#include "ReplicatingSubscription.h"
+#include "Settings.h"
 #include "qpid/Url.h"
 #include "qpid/amqp_0_10/Codecs.h"
 #include "qpid/broker/Bridge.h"
@@ -43,8 +44,8 @@ using namespace broker;
 using types::Variant;
 using std::string;
 
-Backup::Backup(broker::Broker& b, const Settings& s) :
-    broker(b), settings(s), excluder(new ConnectionExcluder())
+Backup::Backup(HaBroker& hb, const Settings& s) :
+    haBroker(hb), broker(hb.getBroker()), settings(s), excluder(new 
ConnectionExcluder())
 {
     // Empty brokerUrl means delay initialization until setUrl() is called.
     if (!s.brokerUrl.empty()) initialize(Url(s.brokerUrl));
@@ -63,7 +64,7 @@ void Backup::initialize(const Url& url) 
     link = result.first;
     link->setUrl(url);
 
-    replicator.reset(new BrokerReplicator(link));
+    replicator.reset(new BrokerReplicator(haBroker, link));
     broker.getExchanges().registerExchange(replicator);
     broker.getConnectionObservers().add(excluder);
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Backup.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Backup.h?rev=1308597&r1=1308596&r2=1308597&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Backup.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Backup.h Mon Apr  2 22:15:31 2012
@@ -38,6 +38,7 @@ namespace ha {
 class Settings;
 class ConnectionExcluder;
 class BrokerReplicator;
+class HaBroker;
 
 /**
  * State associated with a backup broker. Manages connections to primary.
@@ -47,7 +48,7 @@ class BrokerReplicator;
 class Backup
 {
   public:
-    Backup(broker::Broker&, const Settings&);
+    Backup(HaBroker&, const Settings&);
     ~Backup();
     void setBrokerUrl(const Url&);
 
@@ -55,6 +56,7 @@ class Backup
     void initialize(const Url&);
 
     sys::Mutex lock;
+    HaBroker& haBroker;
     broker::Broker& broker;
     Settings settings;
     boost::shared_ptr<broker::Link> link;

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp?rev=1308597&r1=1308596&r2=1308597&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp Mon Apr  2 22:15:31 
2012
@@ -19,6 +19,7 @@
  *
  */
 #include "BrokerReplicator.h"
+#include "HaBroker.h"
 #include "QueueReplicator.h"
 #include "qpid/broker/Broker.h"
 #include "qpid/broker/Queue.h"
@@ -37,6 +38,7 @@
 #include "qmf/org/apache/qpid/broker/EventQueueDelete.h"
 #include "qmf/org/apache/qpid/broker/EventSubscribe.h"
 #include <algorithm>
+#include <sstream>
 
 namespace qpid {
 namespace ha {
@@ -87,6 +89,7 @@ const string QUEUE("queue");
 const string RHOST("rhost");
 const string TYPE("type");
 const string USER("user");
+const string HA_BROKER("habroker");
 
 const string 
AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER("agent.ind.event.org_apache_qpid_broker.#");
 const string QMF2("qmf2");
@@ -100,6 +103,7 @@ const string _PACKAGE_NAME("_package_nam
 const string _SCHEMA_ID("_schema_id");
 const string OBJECT("OBJECT");
 const string ORG_APACHE_QPID_BROKER("org.apache.qpid.broker");
+const string ORG_APACHE_QPID_HA("org.apache.qpid.ha");
 const string QMF_DEFAULT_DIRECT("qmf.default.direct");
 const string _QUERY_REQUEST("_query_request");
 const string BROKER("broker");
@@ -113,36 +117,13 @@ template <class T> bool match(Variant::M
     return T::match(schema[CLASS_NAME], schema[PACKAGE_NAME]);
 }
 
-enum ReplicateLevel { RL_NONE=0, RL_CONFIGURATION, RL_ALL };
-const string S_NONE="none";
-const string S_CONFIGURATION="configuration";
-const string S_ALL="all";
-
-ReplicateLevel replicateLevel(const string& level) {
-    if (level == S_NONE) return RL_NONE;
-    if (level == S_CONFIGURATION) return RL_CONFIGURATION;
-    if (level == S_ALL) return RL_ALL;
-    throw Exception("Invalid value for "+QPID_REPLICATE+": "+level);
-}
-
-ReplicateLevel replicateLevel(const framing::FieldTable& f) {
-    if (f.isSet(QPID_REPLICATE)) return 
replicateLevel(f.getAsString(QPID_REPLICATE));
-    else return RL_NONE;
-}
-
-ReplicateLevel replicateLevel(const Variant::Map& m) {
-    Variant::Map::const_iterator i = m.find(QPID_REPLICATE);
-    if (i != m.end()) return replicateLevel(i->second.asString());
-    else return RL_NONE;
-}
-
-void sendQuery(const string className, const string& queueName, 
SessionHandler& sessionHandler) {
+void sendQuery(const string& packageName, const string& className, const 
string& queueName, SessionHandler& sessionHandler) {
     framing::AMQP_ServerProxy peer(sessionHandler.out);
     Variant::Map request;
     request[_WHAT] = OBJECT;
     Variant::Map schema;
     schema[_CLASS_NAME] = className;
-    schema[_PACKAGE_NAME] = ORG_APACHE_QPID_BROKER;
+    schema[_PACKAGE_NAME] = packageName;
     request[_SCHEMA_ID] = schema;
 
     AMQFrame method((MessageTransferBody(ProtocolVersion(), 
QMF_DEFAULT_DIRECT, 0, 0)));
@@ -181,10 +162,33 @@ Variant::Map asMapVoid(const Variant& va
 
 } // namespace
 
+
+ReplicateLevel BrokerReplicator::replicateLevel(const std::string& str) {
+    ReplicateLevel rl;
+    if (qpid::ha::replicateLevel(str, rl)) return rl;
+    else return haBroker.getSettings().replicateDefault;
+}
+
+ReplicateLevel BrokerReplicator::replicateLevel(const framing::FieldTable& f) {
+    if (f.isSet(QPID_REPLICATE))
+        return replicateLevel(f.getAsString(QPID_REPLICATE));
+    else
+        return haBroker.getSettings().replicateDefault;
+}
+
+ReplicateLevel BrokerReplicator::replicateLevel(const Variant::Map& m) {
+    Variant::Map::const_iterator i = m.find(QPID_REPLICATE);
+    if (i != m.end())
+        return replicateLevel(i->second.asString());
+    else
+        return haBroker.getSettings().replicateDefault;
+}
+
 BrokerReplicator::~BrokerReplicator() {}
 
-BrokerReplicator::BrokerReplicator(const boost::shared_ptr<Link>& l)
-    : Exchange(QPID_CONFIGURATION_REPLICATOR), broker(*l->getBroker()), link(l)
+BrokerReplicator::BrokerReplicator(HaBroker& hb, const 
boost::shared_ptr<Link>& l)
+    : Exchange(QPID_CONFIGURATION_REPLICATOR),
+      haBroker(hb), broker(hb.getBroker()), link(l)
 {
     QPID_LOG(info, "HA: Backup replicating from " <<
              link->getTransport() << ":" << link->getHost() << ":" << 
link->getPort());
@@ -211,17 +215,21 @@ void BrokerReplicator::initializeBridge(
     const qmf::org::apache::qpid::broker::ArgsLinkBridge& 
args(bridge.getArgs());
 
     //declare and bind an event queue
-    peer.getQueue().declare(queueName, "", false, false, true, true, 
FieldTable());
+    FieldTable declareArgs;
+    declareArgs.setString(QPID_REPLICATE, str(RL_NONE));
+    peer.getQueue().declare(queueName, "", false, false, true, true, 
declareArgs);
     peer.getExchange().bind(queueName, QMF_DEFAULT_TOPIC, 
AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER, FieldTable());
     //subscribe to the queue
     peer.getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "", 0, 
FieldTable());
     peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
     peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
 
-    //issue a query request for queues and another for exchanges using event 
queue as the reply-to address
-    sendQuery(QUEUE, queueName, sessionHandler);
-    sendQuery(EXCHANGE, queueName, sessionHandler);
-    sendQuery(BINDING, queueName, sessionHandler);
+    // Issue a query request for queues, exchanges, bindings and the habroker
+    // using event queue as the reply-to address
+    sendQuery(ORG_APACHE_QPID_HA, HA_BROKER, queueName, sessionHandler);
+    sendQuery(ORG_APACHE_QPID_BROKER, QUEUE, queueName, sessionHandler);
+    sendQuery(ORG_APACHE_QPID_BROKER, EXCHANGE, queueName, sessionHandler);
+    sendQuery(ORG_APACHE_QPID_BROKER, BINDING, queueName, sessionHandler);
     QPID_LOG(debug, "HA: Backup activated configuration bridge: " << 
queueName);
 }
 
@@ -257,6 +265,7 @@ void BrokerReplicator::route(Deliverable
                 if      (type == QUEUE) doResponseQueue(values);
                 else if (type == EXCHANGE) doResponseExchange(values);
                 else if (type == BINDING) doResponseBind(values);
+                else if (type == HA_BROKER) doResponseHaBroker(values);
                 else QPID_LOG(error, "HA: Backup received unknown response 
type=" << type
                               << " values=" << values);
             }
@@ -288,7 +297,6 @@ void BrokerReplicator::doEventQueueDecla
             // re-create from event.
             // Events are always up to date, whereas responses may be
             // out of date.
-            QPID_LOG(debug, "HA: Backup created queue: " << name);
             startQueueReplicator(result.first);
         } else {
             // FIXME aconway 2011-12-02: what's the right way to handle this?
@@ -400,7 +408,6 @@ void BrokerReplicator::doEventUnbind(Var
 
 void BrokerReplicator::doResponseQueue(Variant::Map& values) {
     QPID_LOG(debug, "HA: Backup queue response " << values);
-    // FIXME aconway 2011-11-22: more flexible ways & defaults to indicate 
replication
     Variant::Map argsMap(asMapVoid(values[ARGUMENTS]));
     if (!replicateLevel(argsMap)) return;
     framing::FieldTable args;
@@ -417,7 +424,6 @@ void BrokerReplicator::doResponseQueue(V
             ""/*TODO: who is the user?*/,
             ""/*TODO: what should we use as connection id?*/);
     if (result.second) {
-        QPID_LOG(debug, "HA: Backup created catch-up queue: " << values[NAME]);
         startQueueReplicator(result.first);
     } else {
         // FIXME aconway 2011-11-22: Normal to find queue already
@@ -474,7 +480,6 @@ void BrokerReplicator::doResponseBind(Va
     std::string qName = getRefName(QUEUE_REF_PREFIX, values[QUEUE_REF]);
     boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(exName);
     boost::shared_ptr<Queue> queue = broker.getQueues().find(qName);
-    // FIXME aconway 2011-11-24: more flexible configuration for binding 
replication.
 
     // Automatically replicate binding if queue and exchange exist and are 
replicated
     if (exchange && replicateLevel(exchange->getArgs()) &&
@@ -490,6 +495,28 @@ void BrokerReplicator::doResponseBind(Va
     }
 }
 
+namespace {
+const string REPLICATE_DEFAULT="replicateDefault";
+}
+
+// Received the ha-broker configuration object for the primary broker.
+void BrokerReplicator::doResponseHaBroker(Variant::Map& values) {
+    try {
+        ReplicateLevel mine = haBroker.getSettings().replicateDefault;
+        ReplicateLevel primary = 
replicateLevel(values[REPLICATE_DEFAULT].asString());
+        if (mine != primary) {
+            std::ostringstream os;
+            os << "Replicate default on backup (" << mine
+               << ") does not match primary (" <<  primary << ")";
+            haBroker.shutdown(os.str());
+        }
+    } catch (const std::exception& e) {
+        std::ostringstream os;
+        os << "Received invalid replicate default from primary: " << e.what();
+        haBroker.shutdown(os.str());
+    }
+}
+
 void BrokerReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& 
queue) {
     if (replicateLevel(queue->getSettings()) == RL_ALL) {
         boost::shared_ptr<QueueReplicator> qr(new QueueReplicator(queue, 
link));

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h?rev=1308597&r1=1308596&r2=1308597&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h Mon Apr  2 22:15:31 2012
@@ -22,6 +22,7 @@
  *
  */
 
+#include "ReplicateLevel.h"
 #include "qpid/broker/Exchange.h"
 #include "qpid/types/Variant.h"
 #include <boost/shared_ptr.hpp>
@@ -35,7 +36,12 @@ class Bridge;
 class SessionHandler;
 }
 
+namespace framing {
+class FieldTable;
+}
+
 namespace ha {
+class HaBroker;
 
 /**
  * Replicate configuration on a backup broker.
@@ -51,7 +57,7 @@ namespace ha {
 class BrokerReplicator : public broker::Exchange
 {
   public:
-    BrokerReplicator(const boost::shared_ptr<broker::Link>&);
+    BrokerReplicator(HaBroker&, const boost::shared_ptr<broker::Link>&);
     ~BrokerReplicator();
     std::string getType() const;
 
@@ -64,6 +70,10 @@ class BrokerReplicator : public broker::
   private:
     void initializeBridge(broker::Bridge&, broker::SessionHandler&);
 
+    ReplicateLevel replicateLevel(const std::string&);
+    ReplicateLevel replicateLevel(const framing::FieldTable& args);
+    ReplicateLevel replicateLevel(const types::Variant::Map& args);
+
     void doEventQueueDeclare(types::Variant::Map& values);
     void doEventQueueDelete(types::Variant::Map& values);
     void doEventExchangeDeclare(types::Variant::Map& values);
@@ -74,9 +84,11 @@ class BrokerReplicator : public broker::
     void doResponseQueue(types::Variant::Map& values);
     void doResponseExchange(types::Variant::Map& values);
     void doResponseBind(types::Variant::Map& values);
+    void doResponseHaBroker(types::Variant::Map& values);
 
     void startQueueReplicator(const boost::shared_ptr<broker::Queue>&);
 
+    HaBroker& haBroker;
     broker::Broker& broker;
     boost::shared_ptr<broker::Link> link;
 };

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp?rev=1308597&r1=1308596&r2=1308597&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp Mon Apr  2 22:15:31 2012
@@ -27,6 +27,7 @@
 #include "qpid/broker/Broker.h"
 #include "qpid/broker/Link.h"
 #include "qpid/broker/Queue.h"
+#include "qpid/broker/SignalHandler.h"
 #include "qpid/management/ManagementAgent.h"
 #include "qmf/org/apache/qpid/ha/Package.h"
 #include "qmf/org/apache/qpid/ha/ArgsHaBrokerReplicate.h"
@@ -69,16 +70,18 @@ HaBroker::HaBroker(broker::Broker& b, co
         throw Exception("Cannot start HA: management is disabled");
     _qmf::Package  packageInit(ma);
     mgmtObject = new _qmf::HaBroker(ma, this, "ha-broker");
-    // FIXME aconway 2012-03-01: should start in catch-up state and move to 
backup
-    // only when caught up.
     mgmtObject->set_status(BACKUP);
+    mgmtObject->set_replicateDefault(str(settings.replicateDefault));
     ma->addObject(mgmtObject);
+
+    // NOTE: lock is not needed in a constructor but we created it just to pass
+    // to the set functions.
     sys::Mutex::ScopedLock l(lock);
     if (!settings.clientUrl.empty()) setClientUrl(Url(settings.clientUrl), l);
     if (!settings.brokerUrl.empty()) setBrokerUrl(Url(settings.brokerUrl), l);
 
     // If we are in a cluster, we start in backup mode.
-    if (settings.cluster) backup.reset(new Backup(b, s));
+    if (settings.cluster) backup.reset(new Backup(*this, s));
 }
 
 HaBroker::~HaBroker() {}
@@ -169,4 +172,9 @@ std::vector<Url> HaBroker::getKnownBroke
     return knownBrokers;
 }
 
+void HaBroker::shutdown(const std::string& message) {
+    QPID_LOG(critical, "Shutting down: " << message);
+    broker::SignalHandler::shutdown();
+}
+
 }} // namespace qpid::ha

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h?rev=1308597&r1=1308596&r2=1308597&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h Mon Apr  2 22:15:31 2012
@@ -52,6 +52,11 @@ class HaBroker : public management::Mana
     management::Manageable::status_t ManagementMethod (
         uint32_t methodId, management::Args& args, std::string& text);
 
+    broker::Broker& getBroker() { return broker; }
+    const Settings& getSettings() const { return settings; }
+
+    // Log a critical error message and shut down the broker.
+    void shutdown(const std::string& message);
   private:
     void setClientUrl(const Url&, const sys::Mutex::ScopedLock&);
     void setBrokerUrl(const Url&, const sys::Mutex::ScopedLock&);

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/HaPlugin.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/HaPlugin.cpp?rev=1308597&r1=1308596&r2=1308597&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/HaPlugin.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/HaPlugin.cpp Mon Apr  2 22:15:31 2012
@@ -37,6 +37,9 @@ struct Options : public qpid::Options {
              "URL that backup brokers use to connect and fail over.")
             ("ha-public-brokers", optValue(settings.clientUrl,"URL"),
              "URL that clients use to connect and fail over, defaults to 
ha-brokers.")
+            ("ha-replicate-default",
+             optValue(settings.replicateDefault, "LEVEL"),
+            "Replication level for creating queues and exchanges if there is 
no qpid.replicate argument supplied. LEVEL is 'none', 'configuration' or 'all'")
             ("ha-expected-backups", optValue(settings.expectedBackups, "N"),
              "Number of backups expected to be active in the HA cluster.")
             ("ha-username", optValue(settings.username, "USER"),

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp?rev=1308597&r1=1308596&r2=1308597&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp Mon Apr  2 22:15:31 2012
@@ -53,7 +53,7 @@ std::string QueueReplicator::replicatorN
 QueueReplicator::QueueReplicator(boost::shared_ptr<Queue> q, 
boost::shared_ptr<Link> l)
     : Exchange(replicatorName(q->getName()), 0, q->getBroker()), queue(q), 
link(l)
 {
-    logPrefix = "HA: Backup " + queue->getName() + ": ";
+    logPrefix = "HA: Backup of queue " + queue->getName() + ": ";
     QPID_LOG(info, logPrefix << "Created, settings: " << q->getSettings());
 }
 

Added: qpid/trunk/qpid/cpp/src/qpid/ha/ReplicateLevel.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ReplicateLevel.cpp?rev=1308597&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/ReplicateLevel.cpp (added)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/ReplicateLevel.cpp Mon Apr  2 22:15:31 2012
@@ -0,0 +1,62 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "ReplicateLevel.h"
+#include "qpid/Exception.h"
+#include <iostream>
+
+namespace qpid {
+namespace ha {
+
+using namespace std;
+
+namespace {
+const string S_NONE="none";
+const string S_CONFIGURATION="configuration";
+const string S_ALL="all";
+
+string names[] = { S_NONE, S_CONFIGURATION, S_ALL };
+}
+
+bool replicateLevel(const string& level, ReplicateLevel& out) {
+    if (level == S_NONE) { out = RL_NONE; return true; }
+    if (level == S_CONFIGURATION) { out = RL_CONFIGURATION; return true; }
+    if (level == S_ALL) { out = RL_ALL; return true; }
+    return false;
+}
+
+ReplicateLevel replicateLevel(const string& level) {
+    ReplicateLevel rl;
+    if (!replicateLevel(level, rl))
+        throw Exception("Invalid value for replication level: "+level);
+    return rl;
+}
+
+string str(ReplicateLevel l) { return  names[l]; }
+
+ostream& operator<<(ostream& o, ReplicateLevel rl) { return o << str(rl); }
+istream& operator>>(istream& i, ReplicateLevel& rl) {
+    string str;
+    i >> str;
+    rl = replicateLevel(str);
+    return i;
+}
+
+}} // namespace qpid::ha

Propchange: qpid/trunk/qpid/cpp/src/qpid/ha/ReplicateLevel.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/qpid/ha/ReplicateLevel.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Copied: qpid/trunk/qpid/cpp/src/qpid/ha/ReplicateLevel.h (from r1307592, 
qpid/trunk/qpid/cpp/src/qpid/ha/Settings.h)
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ReplicateLevel.h?p2=qpid/trunk/qpid/cpp/src/qpid/ha/ReplicateLevel.h&p1=qpid/trunk/qpid/cpp/src/qpid/ha/Settings.h&r1=1307592&r2=1308597&rev=1308597&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Settings.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/ReplicateLevel.h Mon Apr  2 22:15:31 2012
@@ -1,5 +1,5 @@
-#ifndef QPID_HA_SETTINGS_H
-#define QPID_HA_SETTINGS_H
+#ifndef QPID_HA_REPLICATELEVEL_H
+#define QPID_HA_REPLICATELEVEL_H
 
 /*
  *
@@ -23,24 +23,30 @@
  */
 
 #include <string>
+#include <iosfwd>
 
 namespace qpid {
 namespace ha {
 
+enum ReplicateLevel { RL_NONE, RL_CONFIGURATION, RL_ALL };
+
+/**
+ * If str is a valid replicate level, set out and return true.
+ */
+bool replicateLevel(const std::string& str, ReplicateLevel& out);
+
 /**
- * Configurable settings for HA.
+ *@return enum corresponding to string level.
+ *@throw qpid::Exception if level is not a valid replication level.
  */
-class Settings
-{
-  public:
-    Settings() : cluster(false), expectedBackups(0) {}
-    bool cluster;               // True if we are a cluster member.
-    std::string clientUrl;
-    std::string brokerUrl;
-    size_t expectedBackups;
-    std::string username, password, mechanism;
-  private:
-};
-}} // namespace qpid::ha
+ReplicateLevel replicateLevel(const std::string& level);
+
+/**@return string form of replicate level */
+std::string str(ReplicateLevel l);
+
+std::ostream& operator<<(std::ostream&, ReplicateLevel);
+std::istream& operator>>(std::istream&, ReplicateLevel&);
+
+}} // namespaces qpid::ha
 
-#endif  /*!QPID_HA_SETTINGS_H*/
+#endif  /*!QPID_HA_REPLICATELEVEL_H*/

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Settings.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Settings.h?rev=1308597&r1=1308596&r2=1308597&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Settings.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Settings.h Mon Apr  2 22:15:31 2012
@@ -22,6 +22,7 @@
  *
  */
 
+#include "ReplicateLevel.h"
 #include <string>
 
 namespace qpid {
@@ -33,11 +34,12 @@ namespace ha {
 class Settings
 {
   public:
-    Settings() : cluster(false), expectedBackups(0) {}
+    Settings() : cluster(false), expectedBackups(0), replicateDefault(RL_NONE) 
{}
     bool cluster;               // True if we are a cluster member.
     std::string clientUrl;
     std::string brokerUrl;
     size_t expectedBackups;
+    ReplicateLevel replicateDefault;
     std::string username, password, mechanism;
   private:
 };

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/management-schema.xml
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/management-schema.xml?rev=1308597&r1=1308596&r2=1308597&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/management-schema.xml (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/management-schema.xml Mon Apr  2 22:15:31 
2012
@@ -32,7 +32,11 @@
              desc="Multiple-address URL used by clients to connect to the HA 
brokers."/>
 
     <property name="expectedBackups" type="uint16"
-             desc="Number of HA backup brokers expected."/>>
+             desc="Number of HA backup brokers expected."/>
+
+    <property
+       name="replicateDefault" type="sstr"
+       desc="Replicate value for queues/exchanges without a qpid.replicate 
argument"/>
 
     <method name="promote" desc="Promote a backup broker to primary."/>
 
@@ -48,7 +52,7 @@
       <arg name="expectedBackups" type="uint16" dir="I"/>
     </method>
 
-    <method name="replicate" desc="Replicate from a remote queue to the local 
broker.">
+    <method name="replicate" desc="Replicate individual queue from remote 
broker.">
       <arg name="broker" type="sstr" dir="I"/>
       <arg name="queue" type="sstr" dir="I"/>
     </method>

Modified: qpid/trunk/qpid/cpp/src/tests/ha_tests.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_tests.py?rev=1308597&r1=1308596&r2=1308597&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_tests.py Mon Apr  2 22:15:31 2012
@@ -29,13 +29,17 @@ from qpidtoollibs import BrokerAgent
 log = getLogger("qpid.ha-tests")
 
 class HaBroker(Broker):
-    def __init__(self, test, args=[], broker_url=None, ha_cluster=True, 
**kwargs):
+    def __init__(self, test, args=[], broker_url=None, ha_cluster=True,
+                 ha_replicate_default="all", **kwargs):
         assert BrokerTest.ha_lib, "Cannot locate HA plug-in"
         args = copy(args)
-        args.extend(["--load-module", BrokerTest.ha_lib,
-                     # FIXME aconway 2012-02-13: workaround slow link failover.
-                     "--link-maintenace-interval=0.1",
-                     "--ha-cluster=%s"%ha_cluster])
+        args += ["--load-module", BrokerTest.ha_lib,
+                 "--log-enable=info+", "--log-enable=debug+:ha::",
+                 # FIXME aconway 2012-02-13: workaround slow link failover.
+                 "--link-maintenace-interval=0.1",
+                 "--ha-cluster=%s"%ha_cluster]
+        if ha_replicate_default is not None:
+            args += [ "--ha-replicate-default=%s"%ha_replicate_default ]
         if broker_url: args.extend([ "--ha-brokers", broker_url ])
         Broker.__init__(self, test, args, **kwargs)
         self.commands=os.getenv("PYTHON_COMMANDS")
@@ -64,6 +68,10 @@ class HaBroker(Broker):
         assert os.system(
             "%s/qpid-config --broker=%s add queue %s --replicate 
%s"%(self.commands, self.host_port(), queue, replication)) == 0
 
+    def connect_admin(self, **kwargs):
+        return Broker.connect(self, client_properties={"qpid.ha-admin":1}, 
**kwargs)
+
+
 class HaCluster(object):
     _cluster_count = 0
 
@@ -72,9 +80,9 @@ class HaCluster(object):
         self.test = test
         self._brokers = [ HaBroker(test, 
name="broker%s-%s"%(HaCluster._cluster_count, i), **kwargs) for i in xrange(n)]
         HaCluster._cluster_count += 1
-        self[0].promote()
         self.url = ",".join([b.host_port() for b in self])
         for b in self: b.set_broker_url(self.url)
+        self[0].promote()
 
     def connect(self, i):
         """Connect with reconnect_urls"""
@@ -98,8 +106,6 @@ class HaCluster(object):
     def __iter__(self): return self._brokers.__iter__()
 
 
-def qr_node(value="all"): return 
"node:{x-declare:{arguments:{'qpid.replicate':%s}}}" % value
-
 class HaTest(BrokerTest):
     """Base class for HA test cases, defines convenience functions"""
 
@@ -114,13 +120,13 @@ class HaTest(BrokerTest):
 
     # Wait for address to become valid on a backup broker.
     def wait_backup(self, backup, address):
-        bs = self.connect_admin(backup).session()
+        bs = backup.connect_admin().session()
         self.wait(bs, address)
         bs.connection.close()
 
     # Combines wait_backup and assert_browse_retry
     def assert_browse_backup(self, backup, queue, expected, **kwargs):
-        bs = self.connect_admin(backup).session()
+        bs = backup.connect_admin().session()
         self.wait(bs, queue)
         self.assert_browse_retry(bs, queue, expected, **kwargs)
         bs.connection.close()
@@ -128,12 +134,9 @@ class HaTest(BrokerTest):
     def assert_missing(self, session, address):
         try:
             session.receiver(address)
-            self.fail("Should not have been replicated: %s"%(address))
+            self.fail("Expected NotFound: %s"%(address))
         except NotFound: pass
 
-    def connect_admin(self, backup, **kwargs):
-        """Connect to a backup broker as an admin connection"""
-        return backup.connect(client_properties={"qpid.ha-admin":1}, **kwargs)
 
 class ReplicationTests(HaTest):
     """Correctness tests for  HA replication."""
@@ -173,7 +176,6 @@ class ReplicationTests(HaTest):
 
         def verify(b, prefix, p):
             """Verify setup was replicated to backup b"""
-
             # Wait for configuration to replicate.
             self.wait(b, prefix+"x");
             self.assert_browse_retry(b, prefix+"q1", ["b", "1", "4"])
@@ -203,7 +205,7 @@ class ReplicationTests(HaTest):
         setup(p, "2", primary)
 
         # Verify the data on the backup
-        b = self.connect_admin(backup).session()
+        b = backup.connect_admin().session()
         verify(b, "1", p)
         verify(b, "2", p)
         # Test a series of messages, enqueue all then dequeue all.
@@ -230,12 +232,10 @@ class ReplicationTests(HaTest):
             self.assert_browse_retry(b, "foo", msgs[i+1:])
 
     def test_sync(self):
-        def queue(name, replicate):
-            return "%s;{create:always,%s}"%(name, qr_node(replicate))
         primary = HaBroker(self, name="primary")
         primary.promote()
         p = primary.connect().session()
-        s = p.sender(queue("q","all"))
+        s = p.sender("q;{create:always}")
         for m in [str(i) for i in range(0,10)]: s.send(m)
         s.sync()
         backup1 = HaBroker(self, name="backup1", 
broker_url=primary.host_port())
@@ -246,46 +246,37 @@ class ReplicationTests(HaTest):
         s.sync()
 
         msgs = [str(i) for i in range(30)]
-        b1 = self.connect_admin(backup1).session()
+        b1 = backup1.connect_admin().session()
         self.wait(b1, "q");
         self.assert_browse_retry(b1, "q", msgs)
-        b2 = self.connect_admin(backup2).session()
+        b2 = backup2.connect_admin().session()
         self.wait(b2, "q");
         self.assert_browse_retry(b2, "q", msgs)
 
     def test_send_receive(self):
         """Verify sequence numbers of messages sent by qpid-send"""
+        brokers = HaCluster(self, 3)
         getLogger().setLevel(ERROR) # Hide expected WARNING log messages from 
failover.
-        primary = HaBroker(self, name="primary")
-        primary.promote()
-        backup1 = HaBroker(self, name="backup1", 
broker_url=primary.host_port())
-        backup2 = HaBroker(self, name="backup2", 
broker_url=primary.host_port())
         sender = self.popen(
             ["qpid-send",
-             "--broker", primary.host_port(),
-             "--address", "q;{create:always,%s}"%(qr_node("all")),
+             "--broker", brokers[0].host_port(),
+             "--address", "q;{create:always}",
              "--messages=1000",
              "--content-string=x"
              ])
         receiver = self.popen(
             ["qpid-receive",
-             "--broker", primary.host_port(),
-             "--address", "q;{create:always,%s}"%(qr_node("all")),
+             "--broker", brokers[0].host_port(),
+             "--address", "q;{create:always}",
              "--messages=990",
              "--timeout=10"
              ])
-        try:
-            self.assertEqual(sender.wait(), 0)
-            self.assertEqual(receiver.wait(), 0)
-            expect = [long(i) for i in range(991, 1001)]
-            sn = lambda m: m.properties["sn"]
-            self.assert_browse_retry(self.connect_admin(backup1).session(), 
"q", expect, transform=sn)
-            self.assert_browse_retry(self.connect_admin(backup2).session(), 
"q", expect, transform=sn)
-        except:
-            print self.browse(primary.connect().session(), "q", transform=sn)
-            print self.browse(self.connect_admin(backup1).session(), "q", 
transform=sn)
-            print self.browse(self.connect_admin(backup2).session(), "q", 
transform=sn)
-            raise
+        self.assertEqual(sender.wait(), 0)
+        self.assertEqual(receiver.wait(), 0)
+        expect = [long(i) for i in range(991, 1001)]
+        sn = lambda m: m.properties["sn"]
+        self.assert_browse_backup(brokers[1], "q", expect, transform=sn)
+        self.assert_browse_backup(brokers[2], "q", expect, transform=sn)
 
     def test_failover_python(self):
         """Verify that backups rejects connections and that fail-over works in 
python client"""
@@ -299,12 +290,12 @@ class ReplicationTests(HaTest):
             self.fail("Expected connection to backup to fail")
         except ConnectionError: pass
         # Check that admin connections are allowed to backup.
-        self.connect_admin(backup).close()
+        backup.connect_admin().close()
 
         # Test discovery: should connect to primary after reject by backup
         c = backup.connect(reconnect_urls=[primary.host_port(), 
backup.host_port()], reconnect=True)
         s = c.session()
-        sender = s.sender("q;{create:always,%s}"%(qr_node()))
+        sender = s.sender("q;{create:always}")
         self.wait_backup(backup, "q")
         sender.send("foo")
         primary.kill()
@@ -319,7 +310,7 @@ class ReplicationTests(HaTest):
         primary.promote()
         backup = HaBroker(self, name="backup", broker_url=primary.host_port())
         url="%s,%s"%(primary.host_port(), backup.host_port())
-        primary.connect().session().sender("q;{create:always,%s}"%(qr_node()))
+        primary.connect().session().sender("q;{create:always}")
         self.wait_backup(backup, "q")
 
         sender = NumberedSender(primary, url=url, queue="q", failover_updates 
= False)
@@ -340,8 +331,7 @@ class ReplicationTests(HaTest):
     def test_backup_failover(self):
         """Verify that a backup broker fails over and recovers queue state"""
         brokers = HaCluster(self, 3)
-        brokers[0].connect().session().sender(
-            "q;{create:always,%s}"%(qr_node())).send("a")
+        brokers[0].connect().session().sender("q;{create:always}").send("a")
         for b in brokers[1:]: self.assert_browse_backup(b, "q", ["a"])
         brokers[0].expect = EXPECT_EXIT_FAIL
         brokers.kill(0)
@@ -362,11 +352,11 @@ class ReplicationTests(HaTest):
     def test_standalone_queue_replica(self):
         """Test replication of individual queues outside of cluster mode"""
         getLogger().setLevel(ERROR) # Hide expected WARNING log messages from 
failover.
-        primary = HaBroker(self, name="primary", ha_cluster=False, 
args=["--log-enable=debug+"])
+        primary = HaBroker(self, name="primary", ha_cluster=False)
         pc = primary.connect()
         ps = pc.session().sender("q;{create:always}")
         pr = pc.session().receiver("q;{create:always}")
-        backup = HaBroker(self, name="backup", ha_cluster=False, 
args=["--log-enable=debug+"])
+        backup = HaBroker(self, name="backup", ha_cluster=False)
         br = backup.connect().session().receiver("q;{create:always}")
 
         # Set up replication with qpid-ha
@@ -392,9 +382,9 @@ class ReplicationTests(HaTest):
         cluster = HaCluster(self, 2)
         primary = cluster[0]
         pc = cluster.connect(0)
-        ps = pc.session().sender("q;{create:always,%s}"%qr_node("all"))
-        pr = pc.session().receiver("q;{create:always,%s}"%qr_node("all"))
-        backup = HaBroker(self, name="backup", ha_cluster=False, 
args=["--log-enable=debug+"])
+        ps = pc.session().sender("q;{create:always}")
+        pr = pc.session().receiver("q;{create:always}")
+        backup = HaBroker(self, name="backup", ha_cluster=False)
         br = backup.connect().session().receiver("q;{create:always}")
         backup.replicate(cluster.url, "q")
         ps.send("a")
@@ -413,7 +403,7 @@ class ReplicationTests(HaTest):
         primary  = HaBroker(self, name="primary")
         primary.promote()
         backup = HaBroker(self, name="backup", broker_url=primary.host_port())
-        s = primary.connect().session().sender("lvq; {create:always, 
node:{x-declare:{arguments:{'qpid.last_value_queue_key':lvq-key, 
'qpid.replicate':all}}}}")
+        s = primary.connect().session().sender("lvq; {create:always, 
node:{x-declare:{arguments:{'qpid.last_value_queue_key':lvq-key}}}}")
         def send(key,value): 
s.send(Message(content=value,properties={"lvq-key":key}))
         for kv in 
[("a","a-1"),("b","b-1"),("a","a-2"),("a","a-3"),("c","c-1"),("c","c-2")]:
             send(*kv)
@@ -426,19 +416,21 @@ class ReplicationTests(HaTest):
         self.assert_browse_backup(backup, "lvq", ["a-3", "b-2", "c-3", "d-1"])
 
     def test_ring(self):
+        """Test replication with the ring queue policy"""
         primary  = HaBroker(self, name="primary")
         primary.promote()
         backup = HaBroker(self, name="backup", broker_url=primary.host_port())
-        s = primary.connect().session().sender("q; {create:always, 
node:{x-declare:{arguments:{'qpid.policy_type':ring, 'qpid.max_count':5, 
'qpid.replicate':all}}}}")
+        s = primary.connect().session().sender("q; {create:always, 
node:{x-declare:{arguments:{'qpid.policy_type':ring, 'qpid.max_count':5}}}}")
         for i in range(10): s.send(Message(str(i)))
         self.assert_browse_backup(backup, "q", [str(i) for i in range(5,10)])
 
     def test_reject(self):
+        """Test replication with the reject queue policy"""
         getLogger().setLevel(ERROR) # Hide expected WARNING log messages from 
failover.
         primary  = HaBroker(self, name="primary")
         primary.promote()
         backup = HaBroker(self, name="backup", broker_url=primary.host_port())
-        s = primary.connect().session().sender("q; {create:always, 
node:{x-declare:{arguments:{'qpid.policy_type':reject, 'qpid.max_count':5, 
'qpid.replicate':all}}}}")
+        s = primary.connect().session().sender("q; {create:always, 
node:{x-declare:{arguments:{'qpid.policy_type':reject, 'qpid.max_count':5}}}}")
         try:
             for i in range(10): s.send(Message(str(i)), sync=False)
         except qpid.messaging.exceptions.TargetCapacityExceeded: pass
@@ -450,12 +442,12 @@ class ReplicationTests(HaTest):
         primary.promote()
         backup = HaBroker(self, name="backup", broker_url=primary.host_port())
         session = primary.connect().session()
-        s = session.sender("priority-queue; {create:always, 
node:{x-declare:{arguments:{'qpid.priorities':10, 'qpid.replicate':all}}}}")
+        s = session.sender("priority-queue; {create:always, 
node:{x-declare:{arguments:{'qpid.priorities':10}}}}")
         priorities = [8,9,5,1,2,2,3,4,9,7,8,9,9,2]
         for p in priorities: s.send(Message(priority=p))
         # Can't use browse_backup as browser sees messages in delivery order 
not priority.
         self.wait_backup(backup, "priority-queue")
-        r = self.connect_admin(backup).session().receiver("priority-queue")
+        r = backup.connect_admin().session().receiver("priority-queue")
         received = [r.fetch().priority for i in priorities]
         self.assertEqual(sorted(priorities, reverse=True), received)
 
@@ -469,11 +461,11 @@ class ReplicationTests(HaTest):
         priorities = 
[4,5,3,7,8,8,2,8,2,8,8,16,6,6,6,6,6,6,8,3,5,8,3,5,5,3,3,8,8,3,7,3,7,7,7,8,8,8,2,3]
         limits={7:0,6:4,5:3,4:2,3:2,2:2,1:2}
         limit_policy = ",".join(["'qpid.fairshare':5"] + 
["'qpid.fairshare-%s':%s"%(i[0],i[1]) for i in limits.iteritems()])
-        s = session.sender("priority-queue; {create:always, 
node:{x-declare:{arguments:{'qpid.priorities':%s, %s, 
'qpid.replicate':all}}}}"%(levels,limit_policy))
+        s = session.sender("priority-queue; {create:always, 
node:{x-declare:{arguments:{'qpid.priorities':%s, %s}}}}"%(levels,limit_policy))
         messages = [Message(content=str(uuid4()), priority = p) for p in 
priorities]
         for m in messages: s.send(m)
         self.wait_backup(backup, s.target)
-        r = self.connect_admin(backup).session().receiver("priority-queue")
+        r = backup.connect_admin().session().receiver("priority-queue")
         received = [r.fetch().content for i in priorities]
         sort = sorted(messages, key=lambda m: priority_level(m.priority, 
levels), reverse=True)
         fair = [m.content for m in fairshare(sort, lambda l: limits.get(l,0), 
levels)]
@@ -483,13 +475,14 @@ class ReplicationTests(HaTest):
         primary  = HaBroker(self, name="primary")
         primary.promote()
         backup = HaBroker(self, name="backup", broker_url=primary.host_port())
-        s = primary.connect().session().sender("q; {create:always, 
node:{x-declare:{arguments:{'qpid.policy_type':ring, 'qpid.max_count':5, 
'qpid.priorities':10, 'qpid.replicate':all}}}}")
+        s = primary.connect().session().sender("q; {create:always, 
node:{x-declare:{arguments:{'qpid.policy_type':ring, 'qpid.max_count':5, 
'qpid.priorities':10}}}}")
         priorities = [8,9,5,1,2,2,3,4,9,7,8,9,9,2]
         for p in priorities: s.send(Message(priority=p))
-        # FIXME aconway 2012-02-22: there is a bug in priority ring queues 
that allows a low
-        # priority message to displace a high one. The following commented-out 
assert_browse
-        # is for the correct result, the uncommented one is for the actualy 
buggy result.
-        # See https://issues.apache.org/jira/browse/QPID-3866
+        # FIXME aconway 2012-02-22: there is a bug in priority ring
+        # queues that allows a low priority message to displace a high
+        # one. The following commented-out assert_browse is for the
+        # correct result, the uncommented one is for the actualy buggy
+        # result.  See https://issues.apache.org/jira/browse/QPID-3866
         #
         # self.assert_browse_backup(backup, "q", 
sorted(priorities,reverse=True)[0:5], transform=lambda m: m.priority)
         self.assert_browse_backup(backup, "q", [9,9,9,9,2], transform=lambda 
m: m.priority)
@@ -539,6 +532,29 @@ class ReplicationTests(HaTest):
         for t in tests: t.verify(self, backup1)
         for t in tests: t.verify(self, backup2)
 
+    def test_replicate_default(self):
+        """Make sure we don't replicate if ha-replicate-default is unspecified 
or none"""
+        cluster1 = HaCluster(self, 2, ha_replicate_default=None)
+        c1 = cluster1[0].connect().session().sender("q;{create:always}")
+        cluster2 = HaCluster(self, 2, ha_replicate_default="none")
+        cluster2[0].connect().session().sender("q;{create:always}")
+        time.sleep(.1)               # Give replication a chance.
+        try:
+            cluster1[1].connect_admin().session().receiver("q")
+            self.fail("Excpected no-such-queue exception")
+        except NotFound: pass
+        try:
+            cluster2[1].connect_admin().session().receiver("q")
+            self.fail("Excpected no-such-queue exception")
+        except NotFound: pass
+
+    def test_invalid_default(self):
+        """Verify that a queue with an invalid qpid.replicate gets default 
treatment"""
+        cluster = HaCluster(self, 2, ha_replicate_default="all")
+        c = cluster[0].connect().session().sender("q;{create:always, 
node:{x-declare:{arguments:{'qpid.replicate':XXinvalidXX}}}}")
+        self.wait_backup(cluster[1], "q")
+
+
 def fairshare(msgs, limit, levels):
     """
     Generator to return prioritised messages in expected order for a given 
fairshare limit



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to