Author: aconway
Date: Thu Oct 18 19:42:14 2012
New Revision: 1399813

URL: http://svn.apache.org/viewvc?rev=1399813&view=rev
Log:
Bug 867030 - QPID-4374: Use configurable credit window for HA backup 
subscriptions (Jason Dillaman)

Modified:
    qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/HaPlugin.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h
    qpid/trunk/qpid/cpp/src/qpid/ha/Settings.h

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp?rev=1399813&r1=1399812&r2=1399813&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp Thu Oct 18 19:42:14 
2012
@@ -312,9 +312,14 @@ void BrokerReplicator::initializeBridge(
     peer.getExchange().bind(queueName, QMF_DEFAULT_TOPIC, AGENT_EVENT_BROKER, 
FieldTable());
     peer.getExchange().bind(queueName, QMF_DEFAULT_TOPIC, AGENT_EVENT_HA, 
FieldTable());
     //subscribe to the queue
-    peer.getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "", 0, 
FieldTable());
-    peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
-    peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
+    FieldTable arguments;
+    arguments.setInt(QueueReplicator::QPID_SYNC_FREQUENCY, 1); // FIXME 
aconway 2012-05-22: optimize?
+    peer.getMessage().subscribe(
+        queueName, args.i_dest, 1/*accept-none*/, 0/*pre-acquired*/,
+        false/*exclusive*/, "", 0, arguments);
+    peer.getMessage().setFlowMode(args.i_dest, 1); // Window
+    peer.getMessage().flow(args.i_dest, 0, 
haBroker.getSettings().getFlowMessages());
+    peer.getMessage().flow(args.i_dest, 1, 
haBroker.getSettings().getFlowBytes());
 
     // Issue a query request for queues, exchanges, bindings and the habroker
     // using event queue as the reply-to address

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/HaPlugin.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/HaPlugin.cpp?rev=1399813&r1=1399812&r2=1399813&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/HaPlugin.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/HaPlugin.cpp Thu Oct 18 19:42:14 2012
@@ -48,6 +48,10 @@ struct Options : public qpid::Options {
              "Authentication mechanism for connections between HA brokers")
             ("ha-backup-timeout", optValue(settings.backupTimeout, "SECONDS"),
              "Maximum time to wait for an expected backup to connect and 
become ready.")
+            ("ha-flow-messages", optValue(settings.flowMessages, "N"),
+             "Flow control message count limit for replication, 0 means no 
limit")
+            ("ha-flow-bytes", optValue(settings.flowBytes, "N"),
+             "Flow control byte limit for replication, 0 means no limit")
             ;
     }
 };

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp?rev=1399813&r1=1399812&r2=1399813&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp Thu Oct 18 19:42:14 2012
@@ -22,6 +22,7 @@
 #include "HaBroker.h"
 #include "QueueReplicator.h"
 #include "ReplicatingSubscription.h"
+#include "Settings.h"
 #include "qpid/broker/Bridge.h"
 #include "qpid/broker/Broker.h"
 #include "qpid/broker/Link.h"
@@ -39,7 +40,6 @@
 namespace {
 const std::string QPID_REPLICATOR_("qpid.replicator-");
 const std::string TYPE_NAME("qpid.queue-replicator");
-const std::string QPID_SYNC_FREQUENCY("qpid.sync_frequency");
 }
 
 namespace qpid {
@@ -52,6 +52,7 @@ using sys::Mutex;
 const std::string QPID_HA_EVENT_PREFIX("qpid.ha-");
 const std::string 
QueueReplicator::DEQUEUE_EVENT_KEY(QPID_HA_EVENT_PREFIX+"dequeue");
 const std::string 
QueueReplicator::POSITION_EVENT_KEY(QPID_HA_EVENT_PREFIX+"position");
+const std::string QueueReplicator::QPID_SYNC_FREQUENCY("qpid.sync_frequency");
 
 std::string QueueReplicator::replicatorName(const std::string& queueName) {
     return QPID_REPLICATOR_ + queueName;
@@ -107,7 +108,8 @@ QueueReplicator::QueueReplicator(HaBroke
     : Exchange(replicatorName(q->getName()), 0, q->getBroker()),
       haBroker(hb),
       logPrefix("Backup queue "+q->getName()+": "),
-      queue(q), link(l), brokerInfo(hb.getBrokerInfo()), subscribed(false)
+      queue(q), link(l), brokerInfo(hb.getBrokerInfo()), subscribed(false),
+      settings(hb.getSettings())
 {
     args.setString(QPID_REPLICATE, printable(NONE).str());
     Uuid uuid(true);
@@ -165,23 +167,21 @@ void QueueReplicator::initializeBridge(B
     if (!queue) return;         // Already destroyed
     AMQP_ServerProxy peer(sessionHandler.out);
     const qmf::org::apache::qpid::broker::ArgsLinkBridge& 
args(bridge.getArgs());
-    FieldTable settings;
-    settings.setInt(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, 1);
-    settings.setInt(QPID_SYNC_FREQUENCY, 1); // FIXME aconway 2012-05-22: 
optimize?
-    settings.setInt(ReplicatingSubscription::QPID_BACK,
-                    queue->getPosition());
-    settings.setTable(ReplicatingSubscription::QPID_BROKER_INFO,
-                      brokerInfo.asFieldTable());
+    FieldTable arguments;
+    arguments.setInt(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, 
1);
+    arguments.setInt(QPID_SYNC_FREQUENCY, 1); // FIXME aconway 2012-05-22: 
optimize?
+    arguments.setInt(ReplicatingSubscription::QPID_BACK, queue->getPosition());
+    
arguments.setTable(ReplicatingSubscription::QPID_BROKER_INFO,brokerInfo.asFieldTable());
     SequenceNumber front, back;
     queue->getRange(front, back, broker::REPLICATOR);
-    if (front <= back) settings.setInt(ReplicatingSubscription::QPID_FRONT, 
front);
+    if (front <= back) arguments.setInt(ReplicatingSubscription::QPID_FRONT, 
front);
     try {
         peer.getMessage().subscribe(
             args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/,
-            false/*exclusive*/, "", 0, settings);
-        // FIXME aconway 2012-05-22: use a finite credit window?
-        peer.getMessage().flow(getName(), 0, 0xFFFFFFFF);
-        peer.getMessage().flow(getName(), 1, 0xFFFFFFFF);
+            false/*exclusive*/, "", 0, arguments);
+        peer.getMessage().setFlowMode(getName(), 1); // Window
+        peer.getMessage().flow(getName(), 0, settings.getFlowMessages());
+        peer.getMessage().flow(getName(), 1, settings.getFlowBytes());
     }
     catch(const exception& e) {
         QPID_LOG(error, QPID_MSG(logPrefix + "Cannot connect to primary: " << 
e.what()));
@@ -190,7 +190,7 @@ void QueueReplicator::initializeBridge(B
     qpid::Address primary;
     link->getRemoteAddress(primary);
     QPID_LOG(info, logPrefix << "Connected to " << primary << "(" << 
bridgeName << ")");
-    QPID_LOG(trace, logPrefix << "Subscription settings: " << settings);
+    QPID_LOG(trace, logPrefix << "Subscription arguments: " << arguments);
 }
 
 namespace {

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h?rev=1399813&r1=1399812&r2=1399813&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h Thu Oct 18 19:42:14 2012
@@ -41,6 +41,7 @@ class Deliverable;
 
 namespace ha {
 class HaBroker;
+class Settings;
 
 /**
  * Exchange created on a backup broker to replicate a queue on the primary.
@@ -57,6 +58,8 @@ class QueueReplicator : public broker::E
   public:
     static const std::string DEQUEUE_EVENT_KEY;
     static const std::string POSITION_EVENT_KEY;
+    static const std::string QPID_SYNC_FREQUENCY;
+
     static std::string replicatorName(const std::string& queueName);
     static bool isReplicatorName(const std::string&);
 
@@ -101,6 +104,7 @@ class QueueReplicator : public broker::E
     boost::shared_ptr<broker::Bridge> bridge;
     BrokerInfo brokerInfo;
     bool subscribed;
+    const Settings& settings;
 };
 
 }} // namespace qpid::ha

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Settings.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Settings.h?rev=1399813&r1=1399812&r2=1399813&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Settings.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Settings.h Thu Oct 18 19:42:14 2012
@@ -23,6 +23,7 @@
  */
 
 #include "types.h"
+#include "qpid/sys/IntegerTypes.h"
 #include <string>
 
 namespace qpid {
@@ -34,7 +35,8 @@ namespace ha {
 class Settings
 {
   public:
-    Settings() : cluster(false), replicateDefault(NONE), backupTimeout(5)
+    Settings() : cluster(false), replicateDefault(NONE), backupTimeout(5),
+                 flowMessages(100), flowBytes(0)
     {}
 
     bool cluster;               // True if we are a cluster member.
@@ -43,7 +45,13 @@ class Settings
     Enum<ReplicateLevel> replicateDefault;
     std::string username, password, mechanism;
     double backupTimeout;
-  private:
+
+    uint32_t flowMessages, flowBytes;
+
+    static const uint32_t NO_LIMIT=0xFFFFFFFF;
+    static uint32_t flowValue(uint32_t n) { return n ? n : NO_LIMIT; }
+    uint32_t getFlowMessages() const { return flowValue(flowMessages); }
+    uint32_t getFlowBytes() const { return flowValue(flowBytes); }
 };
 }} // namespace qpid::ha
 



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to