Author: aconway
Date: Mon May 28 18:24:43 2012
New Revision: 1343351

URL: http://svn.apache.org/viewvc?rev=1343351&view=rev
Log:
QPID-3603: Better log messages for HA code.

Identify host name of backup in ReplicatingSubscription logs.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.h
    qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/LogPrefix.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/LogPrefix.h
    qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h
    qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.cpp?rev=1343351&r1=1343350&r2=1343351&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.cpp Mon May 28 18:24:43 2012
@@ -31,6 +31,7 @@ namespace ha {
 namespace {
 std::string SYSTEM_ID="system-id";
 std::string HOST_NAME="host-name";
+std::string PORT="port";
 std::string STATUS="status";
 }
 
@@ -41,6 +42,7 @@ FieldTable BrokerInfo::asFieldTable() co
     FieldTable ft;
     ft.setString(SYSTEM_ID, systemId.str());
     ft.setString(HOST_NAME, hostName);
+    ft.setInt(PORT, port);
     ft.setInt(STATUS, status);
     return ft;
 }
@@ -48,11 +50,12 @@ FieldTable BrokerInfo::asFieldTable() co
 void BrokerInfo::assign(const FieldTable& ft) {
     systemId = Uuid(ft.getAsString(SYSTEM_ID));
     hostName = ft.getAsString(HOST_NAME);
+    port = ft.getAsInt(PORT);
     status = BrokerStatus(ft.getAsInt(STATUS));
 }
 
 std::ostream& operator<<(std::ostream& o, const BrokerInfo& b) {
-    return o << b.getHostName() << "(" << b.getSystemId()
+    return o << b.getHostName() << ":" << b.getPort() << "(" << b.getSystemId()
              << "," << printable(b.getStatus()) << ")";
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.h?rev=1343351&r1=1343350&r2=1343351&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerInfo.h Mon May 28 18:24:43 2012
@@ -23,6 +23,7 @@
  */
 
 #include "Enum.h"
+#include "qpid/Url.h"
 #include "qpid/framing/Uuid.h"
 #include "qpid/framing/FieldTable.h"
 #include <string>
@@ -37,8 +38,8 @@ namespace ha {
 class BrokerInfo
 {
   public:
-    BrokerInfo(const std::string& host, const framing::Uuid& id) :
-        hostName(host), systemId(id) {}
+    BrokerInfo(const std::string& host, uint16_t port_, const framing::Uuid& 
id) :
+        hostName(host), port(port_), systemId(id) {}
 
     BrokerInfo(const framing::FieldTable& ft) { assign(ft); }
     framing::FieldTable asFieldTable() const;
@@ -47,10 +48,13 @@ class BrokerInfo
     framing::Uuid getSystemId() const { return systemId; }
     std::string getHostName() const { return hostName; }
     BrokerStatus getStatus() const { return status; }
+     uint16_t getPort() const { return port; }
+
     void setStatus(BrokerStatus s)  { status = s; }
 
   private:
     std::string hostName;
+    uint16_t port;
     framing::Uuid systemId;
     BrokerStatus status;
 };

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp?rev=1343351&r1=1343350&r2=1343351&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp Mon May 28 18:24:43 
2012
@@ -233,7 +233,7 @@ void BrokerReplicator::initializeBridge(
     sendQuery(ORG_APACHE_QPID_BROKER, QUEUE, queueName, sessionHandler);
     sendQuery(ORG_APACHE_QPID_BROKER, EXCHANGE, queueName, sessionHandler);
     sendQuery(ORG_APACHE_QPID_BROKER, BINDING, queueName, sessionHandler);
-    QPID_LOG(debug, logPrefix << "opened configuration bridge: " << queueName);
+    QPID_LOG(debug, logPrefix << "Opened configuration bridge: " << queueName);
 }
 
 void BrokerReplicator::route(Deliverable& msg) {
@@ -271,7 +271,7 @@ void BrokerReplicator::route(Deliverable
             }
         }
     } catch (const std::exception& e) {
-        QPID_LOG(critical, logPrefix << "configuration failed: " << e.what()
+        QPID_LOG(critical, logPrefix << "Configuration failed: " << e.what()
                  << ": while handling: " << list);
         throw;
     }
@@ -290,7 +290,7 @@ void BrokerReplicator::doEventQueueDecla
         // The queue was definitely created on the primary.
         if (broker.getQueues().find(name)) {
             broker.getQueues().destroy(name);
-            QPID_LOG(warning, logPrefix << "queue declare event, replaced 
exsiting: "
+            QPID_LOG(warning, logPrefix << "Queue declare event, replaced 
exsiting: "
                      << name);
         }
         std::pair<boost::shared_ptr<Queue>, bool> result =
@@ -304,7 +304,7 @@ void BrokerReplicator::doEventQueueDecla
                 values[USER].asString(),
                 values[RHOST].asString());
         assert(result.second);  // Should be true since we destroyed existing 
queue above
-        QPID_LOG(debug, logPrefix << "queue declare event: " << name);
+        QPID_LOG(debug, logPrefix << "Queue declare event: " << name);
         startQueueReplicator(result.first);
     }
 }
@@ -323,9 +323,9 @@ void BrokerReplicator::doEventQueueDelet
     string name = values[QNAME].asString();
     boost::shared_ptr<Queue> queue = broker.getQueues().find(name);
     if (!queue) {
-        QPID_LOG(warning, logPrefix << "queue delete event, does not exist: " 
<< name);
+        QPID_LOG(warning, logPrefix << "Queue delete event, does not exist: " 
<< name);
     } else if (!haBroker.replicateLevel(queue->getSettings())) {
-        QPID_LOG(warning, logPrefix << "queue delete event, not replicated: " 
<< name);
+        QPID_LOG(warning, logPrefix << "Queue delete event, not replicated: " 
<< name);
     } else {
         boost::shared_ptr<QueueReplicator> qr = findQueueReplicator(name);
         if (qr) {
@@ -336,7 +336,7 @@ void BrokerReplicator::doEventQueueDelet
             broker.getExchanges().destroy(qr->getName());
         }
         broker.deleteQueue(name, values[USER].asString(), 
values[RHOST].asString());
-        QPID_LOG(debug, logPrefix << "queue delete event: " << name);
+        QPID_LOG(debug, logPrefix << "Queue delete event: " << name);
     }
 }
 
@@ -351,7 +351,7 @@ void BrokerReplicator::doEventExchangeDe
         // The exchange was definitely created on the primary.
         if (broker.getExchanges().find(name)) {
             broker.getExchanges().destroy(name);
-            QPID_LOG(warning, logPrefix << "exchange declare event, replaced 
exsiting: " << name)
+            QPID_LOG(warning, logPrefix << "Exchange declare event, replaced 
exsiting: " << name)
                 }
         std::pair<boost::shared_ptr<Exchange>, bool> result =
             broker.createExchange(
@@ -363,7 +363,7 @@ void BrokerReplicator::doEventExchangeDe
                 values[USER].asString(),
                 values[RHOST].asString());
         assert(result.second);
-        QPID_LOG(debug, logPrefix << "exchange declare event: " << name);
+        QPID_LOG(debug, logPrefix << "Exchange declare event: " << name);
     }
 }
 
@@ -371,11 +371,11 @@ void BrokerReplicator::doEventExchangeDe
     string name = values[EXNAME].asString();
     boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(name);
     if (!exchange) {
-        QPID_LOG(warning, logPrefix << "exchange delete event, does not exist: 
" << name);
+        QPID_LOG(warning, logPrefix << "Exchange delete event, does not exist: 
" << name);
     } else if (!haBroker.replicateLevel(exchange->getArgs())) {
-        QPID_LOG(warning, logPrefix << "exchange delete event, not replicated: 
" << name);
+        QPID_LOG(warning, logPrefix << "Exchange delete event, not replicated: 
" << name);
     } else {
-        QPID_LOG(debug, logPrefix << "exchange delete event:" << name);
+        QPID_LOG(debug, logPrefix << "Exchange delete event:" << name);
         broker.deleteExchange(
             name,
             values[USER].asString(),
@@ -397,7 +397,7 @@ void BrokerReplicator::doEventBind(Varia
         amqp_0_10::translate(asMapVoid(values[ARGS]), args);
         string key = values[KEY].asString();
         exchange->bind(queue, key, &args);
-        QPID_LOG(debug, logPrefix << "bind event: exchange=" << 
exchange->getName()
+        QPID_LOG(debug, logPrefix << "Bind event: exchange=" << 
exchange->getName()
                  << " queue=" << queue->getName()
                  << " key=" << key);
     }
@@ -417,7 +417,7 @@ void BrokerReplicator::doEventUnbind(Var
         amqp_0_10::translate(asMapVoid(values[ARGS]), args);
         string key = values[KEY].asString();
         exchange->unbind(queue, key, &args);
-        QPID_LOG(debug, logPrefix << "unbind event: exchange=" << 
exchange->getName()
+        QPID_LOG(debug, logPrefix << "Unbind event: exchange=" << 
exchange->getName()
                  << " queue=" << queue->getName()
                  << " key=" << key);
     }
@@ -444,7 +444,7 @@ void BrokerReplicator::doResponseQueue(V
             ""/*TODO: what should we use as connection id?*/);
     // It is normal for the queue to already exist if we are failing over.
     if (result.second) startQueueReplicator(result.first);
-    QPID_LOG(debug, logPrefix << "queue response: " << name);
+    QPID_LOG(debug, logPrefix << "Queue response: " << name);
 }
 
 void BrokerReplicator::doResponseExchange(Variant::Map& values) {
@@ -461,9 +461,9 @@ void BrokerReplicator::doResponseExchang
             ""/*TODO: who is the user?*/,
             ""/*TODO: what should we use as connection id?*/).second)
     {
-        QPID_LOG(debug, logPrefix << "exchange response: " << 
values[NAME].asString());
+        QPID_LOG(debug, logPrefix << "Exchange response: " << 
values[NAME].asString());
     } else {
-        QPID_LOG(warning, logPrefix << "exchange response, already exists: " <<
+        QPID_LOG(warning, logPrefix << "Exchange response, already exists: " <<
                  values[NAME].asString());
     }
 }
@@ -503,7 +503,7 @@ void BrokerReplicator::doResponseBind(Va
         amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args);
         string key = values[KEY].asString();
         exchange->bind(queue, key, &args);
-        QPID_LOG(debug, logPrefix << "bind response: exchange=" << 
exchange->getName()
+        QPID_LOG(debug, logPrefix << "Bind response: exchange=" << 
exchange->getName()
                  << " queue=" << queue->getName()
                  << " key=" << key);
     }
@@ -545,7 +545,7 @@ void BrokerReplicator::startQueueReplica
 {
     if (haBroker.replicateLevel(queue->getSettings()) == ALL) {
         boost::shared_ptr<QueueReplicator> qr(
-            new QueueReplicator(LogPrefix(haBroker, queue->getName()), queue, 
link));
+            new QueueReplicator(haBroker, queue, link));
         if (!broker.getExchanges().registerExchange(qr))
             throw Exception(QPID_MSG("Duplicate queue replicator " << 
qr->getName()));
         qr->activate();

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp?rev=1343351&r1=1343350&r2=1343351&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp Mon May 28 18:24:43 2012
@@ -55,6 +55,8 @@ HaBroker::HaBroker(broker::Broker& b, co
       status(STANDALONE),
       excluder(new ConnectionExcluder(logPrefix, 
broker.getSystem()->getSystemId())),
       brokerInfo(broker.getSystem()->getNodeName(),
+                 // TODO aconway 2012-05-24: other transports?
+                 broker.getPort(broker::Broker::TCP_TRANSPORT),
                  broker.getSystem()->getSystemId())
 
 {
@@ -180,8 +182,7 @@ Manageable::status_t HaBroker::Managemen
           boost::shared_ptr<broker::Link> link = result.first;
           link->setUrl(url);
           // Create a queue replicator
-          boost::shared_ptr<QueueReplicator> qr(
-              new QueueReplicator(LogPrefix(*this, queue->getName()), queue, 
link));
+          boost::shared_ptr<QueueReplicator> qr(new QueueReplicator(*this, 
queue, link));
           qr->activate();
           broker.getExchanges().registerExchange(qr);
           break;

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/LogPrefix.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/LogPrefix.cpp?rev=1343351&r1=1343350&r2=1343351&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/LogPrefix.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/LogPrefix.cpp Mon May 28 18:24:43 2012
@@ -25,22 +25,26 @@
 namespace qpid {
 namespace ha {
 
-LogPrefix::LogPrefix(HaBroker& hb, const std::string& queue) : haBroker(&hb), 
status(0) {
-    if (queue.size()) tail = " queue " + queue;
+LogPrefix::LogPrefix(HaBroker& hb, const std::string& msg) : haBroker(&hb), 
status(0) {
+    if (msg.size()) setMessage(msg);
 }
 
-LogPrefix::LogPrefix(LogPrefix& lp, const std::string& queue)
+LogPrefix::LogPrefix(LogPrefix& lp, const std::string& msg)
   : haBroker(lp.haBroker), status(0)
 {
-    if (queue.size()) tail = " queue " + queue;
+    if (msg.size()) setMessage(msg);
 }
 
 LogPrefix::LogPrefix(BrokerStatus& s) : haBroker(0), status(&s) {}
 
+void LogPrefix::setMessage(const std::string& msg) {
+    tail = " "+msg+":";
+}
+
 std::ostream& operator<<(std::ostream& o, const LogPrefix& l) {
     return o << "HA("
              << printable(l.status ? *l.status : l.haBroker->getStatus())
-             << ")" << l.tail << ": ";
+             << ")" << l.tail << " ";
 }
 
 }} // namespace qpid::ha

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/LogPrefix.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/LogPrefix.h?rev=1343351&r1=1343350&r2=1343351&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/LogPrefix.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/LogPrefix.h Mon May 28 18:24:43 2012
@@ -40,10 +40,11 @@ class LogPrefix
     /** For use by all classes other than HaBroker */
     LogPrefix(HaBroker& hb, const std::string& queue=std::string());
     LogPrefix(LogPrefix& lp, const std::string& queue);
-
     /** For use by the HaBroker itself. */
     LogPrefix(BrokerStatus&);
 
+    void setMessage(const std::string&);
+
   private:
     HaBroker* haBroker;
     BrokerStatus* status;

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp?rev=1343351&r1=1343350&r2=1343351&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp Mon May 28 18:24:43 2012
@@ -21,6 +21,7 @@
 
 #include "Counter.h"
 #include "QueueReplicator.h"
+#include "HaBroker.h"
 #include "ReplicatingSubscription.h"
 #include "qpid/broker/Bridge.h"
 #include "qpid/broker/Broker.h"
@@ -59,14 +60,15 @@ bool QueueReplicator::isEventKey(const s
     return ret;
 }
 
-QueueReplicator::QueueReplicator(const LogPrefix& lp,
+QueueReplicator::QueueReplicator(HaBroker& hb,
                                  boost::shared_ptr<Queue> q,
                                  boost::shared_ptr<Link> l)
     : Exchange(replicatorName(q->getName()), 0, q->getBroker()),
-      logPrefix(lp), queue(q), link(l)
+      haBroker(hb), logPrefix(hb), queue(q), link(l)
 {
-    framing::Uuid uuid(true);
+    Uuid uuid(true);
     bridgeName = replicatorName(q->getName()) + std::string(".") + uuid.str();
+    logPrefix.setMessage(q->getName());
     QPID_LOG(info, logPrefix << "Created");
 }
 
@@ -109,12 +111,15 @@ void QueueReplicator::deactivate() {
 // Called in a broker connection thread when the bridge is created.
 void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& 
sessionHandler) {
     sys::Mutex::ScopedLock l(lock);
-    framing::AMQP_ServerProxy peer(sessionHandler.out);
+    AMQP_ServerProxy peer(sessionHandler.out);
     const qmf::org::apache::qpid::broker::ArgsLinkBridge& 
args(bridge.getArgs());
-    framing::FieldTable settings;
+    FieldTable settings;
     settings.setInt(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, 1);
     settings.setInt(QPID_SYNC_FREQUENCY, 1); // FIXME aconway 2012-05-22: 
optimize?
-    settings.setInt(ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER, 
queue->getPosition());
+    settings.setInt(ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER,
+                    queue->getPosition());
+    settings.setTable(ReplicatingSubscription::QPID_BROKER_INFO,
+                      haBroker.getBrokerInfo().asFieldTable());
     SequenceNumber front;
     if (ReplicatingSubscription::getFront(*queue, front))
         settings.setInt(ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER, 
front);

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h?rev=1343351&r1=1343350&r2=1343351&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h Mon May 28 18:24:43 2012
@@ -42,6 +42,7 @@ class Deliverable;
 namespace ha {
 
 class Counter;
+class HaBroker;
 
 /**
  * Exchange created on a backup broker to replicate a queue on the primary.
@@ -62,7 +63,7 @@ class QueueReplicator : public broker::E
     /** Test if a string is an event key */
     static bool isEventKey(const std::string key);
 
-    QueueReplicator(const LogPrefix&,
+    QueueReplicator(HaBroker&,
                     boost::shared_ptr<broker::Queue> q,
                     boost::shared_ptr<broker::Link> l);
 
@@ -82,6 +83,7 @@ class QueueReplicator : public broker::E
     void initializeBridge(broker::Bridge& bridge, broker::SessionHandler& 
sessionHandler);
     void dequeue(framing::SequenceNumber, const sys::Mutex::ScopedLock&);
 
+    HaBroker& haBroker;
     LogPrefix logPrefix;
     std::string bridgeName;
     sys::Mutex lock;

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp?rev=1343351&r1=1343350&r2=1343351&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp Mon May 28 
18:24:43 2012
@@ -20,6 +20,7 @@
  */
 
 #include "ReplicatingSubscription.h"
+#include "HaBroker.h"
 #include "Primary.h"
 #include "qpid/broker/Queue.h"
 #include "qpid/broker/SessionContext.h"
@@ -40,6 +41,7 @@ using namespace std;
 const string 
ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION("qpid.replicating-subscription");
 const string 
ReplicatingSubscription::QPID_HIGH_SEQUENCE_NUMBER("qpid.high-sequence-number");
 const string 
ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER("qpid.low-sequence-number");
+const string ReplicatingSubscription::QPID_BROKER_INFO("qpid.broker-info");
 
 namespace {
 const string DOLLAR("$");
@@ -59,7 +61,7 @@ class DequeueRemover
     }
 
     void operator()(const QueuedMessage& message) {
-        if (message.position >= start && message.position <= end) {
+ if (message.position >= start && message.position <= end) {
             //i.e. message is within the intial range and has not been 
dequeued,
             //so remove it from the dequeues
             dequeues.remove(message.position);
@@ -127,7 +129,7 @@ ReplicatingSubscription::Factory::create
     boost::shared_ptr<ReplicatingSubscription> rs;
     if (arguments.isSet(QPID_REPLICATING_SUBSCRIPTION)) {
         rs.reset(new ReplicatingSubscription(
-                     LogPrefix(haBroker),
+                     haBroker,
                      parent, name, queue, ack, acquire, exclusive, tag,
                      resumeId, resumeTtl, arguments));
         queue->addObserver(rs);
@@ -173,7 +175,7 @@ ostream& operator<<(ostream& o, const Qu
 }
 
 ReplicatingSubscription::ReplicatingSubscription(
-    LogPrefix lp,
+    HaBroker& hb,
     SemanticState* parent,
     const string& name,
     Queue::shared_ptr queue,
@@ -186,17 +188,24 @@ ReplicatingSubscription::ReplicatingSubs
     const framing::FieldTable& arguments
 ) : ConsumerImpl(parent, name, queue, ack, acquire, exclusive, tag,
                  resumeId, resumeTtl, arguments),
-    logPrefix(lp, queue->getName()),
+    haBroker(hb),
+    logPrefix(hb),
     dummy(new Queue(mask(name))),
     ready(false)
 {
     try {
-        // FIXME aconway 2012-05-22: use hostname from brokerinfo
-        // Separate the remote part from a "local-remote" address for logging.
-        string address = parent->getSession().getConnection().getUrl();
-        size_t i = address.find('-');
-        if (i != string::npos) address = address.substr(i+1);
-        logSuffix = " (" + address + ")";
+        // Set a log prefix message that identifies the remote broker.
+        // FIXME aconway 2012-05-24: use URL instead of host:port, include 
transport?
+        ostringstream os;
+        os << queue->getName() << "@";
+        FieldTable ft;
+        if (arguments.getTable(ReplicatingSubscription::QPID_BROKER_INFO, ft)) 
{
+            BrokerInfo info(ft);
+            os << info.getHostName() << ":" << info.getPort();
+        }
+        else
+            os << parent->getSession().getConnection().getUrl();
+        logPrefix.setMessage(os.str());
 
         QueueRange primary(*queue);
         QueueRange backup(arguments);
@@ -228,16 +237,16 @@ ReplicatingSubscription::ReplicatingSubs
                  << " backup range " << backup
                  << " primary range " << primary
                  << " position " << position
-                 << " dequeues " << dequeues << logSuffix);
+                 << " dequeues " << dequeues);
     }
     catch (const std::exception& e) {
         throw Exception(QPID_MSG(logPrefix << "Error setting up replication: "
-                                 << e.what() << logSuffix));
+                                 << e.what()));
     }
 }
 
 ReplicatingSubscription::~ReplicatingSubscription() {
-    QPID_LOG(debug, logPrefix << "Detroyed replicating subscription" << 
logSuffix);
+    QPID_LOG(debug, logPrefix << "Detroyed replicating subscription");
 }
 
 // Called in subscription's connection thread when the subscription is created.
@@ -258,7 +267,7 @@ void ReplicatingSubscription::initialize
     }
     else {
         QPID_LOG(debug, logPrefix << "Backup subscription catching up from "
-                 << position << " to " << readyPosition << logSuffix);
+                 << position << " to " << readyPosition);
     }
 }
 
@@ -267,7 +276,7 @@ bool ReplicatingSubscription::deliver(Qu
     try {
         // Add position events for the subscribed queue, not the internal 
event queue.
         if (qm.queue == getQueue().get()) {
-            QPID_LOG(trace, logPrefix << "replicating " << qm << logSuffix);
+            QPID_LOG(trace, logPrefix << "Replicating " << qm);
             {
                 sys::Mutex::ScopedLock l(lock);
                 assert(position == qm.position);
@@ -296,8 +305,8 @@ bool ReplicatingSubscription::deliver(Qu
         else
             return ConsumerImpl::deliver(qm); // Message is for internal event 
queue.
     } catch (const std::exception& e) {
-        QPID_LOG(critical, logPrefix << "error replicating " << qm
-                 << logSuffix << ": " << e.what());
+        QPID_LOG(critical, logPrefix << "Error replicating " << qm
+                 << ": " << e.what());
         throw;
     }
 }
@@ -305,7 +314,7 @@ bool ReplicatingSubscription::deliver(Qu
 void ReplicatingSubscription::setReady(const sys::Mutex::ScopedLock&) {
     if (ready) return;
     ready = true;
-    QPID_LOG(info, logPrefix << "Caught up at " << getPosition() << logSuffix);
+    QPID_LOG(info, logPrefix << "Caught up at " << getPosition());
     // Notify Primary that a subscription is ready.
     if (Primary::get()) Primary::get()->readyReplica(getQueue()->getName());
 }
@@ -319,7 +328,7 @@ void ReplicatingSubscription::complete(
 {
     // Handle completions for the subscribed queue, not the internal event 
queue.
     if (qm.queue == getQueue().get()) {
-        QPID_LOG(trace, logPrefix << "completed " << qm << logSuffix);
+        QPID_LOG(trace, logPrefix << "Completed " << qm);
         Delayed::iterator i= delayed.find(qm.position);
         // The same message can be completed twice, by acknowledged and
         // dequeued, remove it from the set so it only gets completed
@@ -337,7 +346,7 @@ void ReplicatingSubscription::complete(
 // Called in arbitrary connection thread *with the queue lock held*
 void ReplicatingSubscription::enqueued(const QueuedMessage& qm) {
     // Delay completion
-    QPID_LOG(trace, logPrefix << "delaying completion of " << qm << logSuffix);
+    QPID_LOG(trace, logPrefix << "Delaying completion of " << qm);
     qm.payload->getIngressCompletion().startCompleter();
     {
         sys::Mutex::ScopedLock l(lock);
@@ -350,7 +359,7 @@ void ReplicatingSubscription::enqueued(c
 void ReplicatingSubscription::cancelComplete(
     const Delayed::value_type& v, const sys::Mutex::ScopedLock&)
 {
-    QPID_LOG(trace, logPrefix << "cancel completed " << v.second << logSuffix);
+    QPID_LOG(trace, logPrefix << "Cancel completed " << v.second);
     v.second.payload->getIngressCompletion().finishCompleter();
 }
 
@@ -361,8 +370,8 @@ void ReplicatingSubscription::cancel()
         boost::dynamic_pointer_cast<QueueObserver>(shared_from_this()));
     {
         sys::Mutex::ScopedLock l(lock);
-        QPID_LOG(debug, logPrefix << "cancel backup subscription to "
-                 << getQueue()->getName() << logSuffix);
+        QPID_LOG(debug, logPrefix << "Cancel backup subscription to "
+                 << getQueue()->getName());
         for_each(delayed.begin(), delayed.end(),
                  boost::bind(&ReplicatingSubscription::cancelComplete, this, 
_1, boost::ref(l)));
         delayed.clear();
@@ -385,8 +394,7 @@ bool ReplicatingSubscription::hideDelete
 void ReplicatingSubscription::sendDequeueEvent(const sys::Mutex::ScopedLock&)
 {
     if (dequeues.empty()) return;
-    QPID_LOG(trace, logPrefix << "sending dequeues " << dequeues
-             << " from " << getQueue()->getName() << logSuffix);
+    QPID_LOG(trace, logPrefix << "Sending dequeues " << dequeues);
     string buf(dequeues.encodedSize(),'\0');
     framing::Buffer buffer(&buf[0], buf.size());
     dequeues.encode(buffer);
@@ -401,7 +409,7 @@ void ReplicatingSubscription::sendDequeu
 void ReplicatingSubscription::dequeued(const QueuedMessage& qm)
 {
     {
-        QPID_LOG(trace, logPrefix << "dequeued " << qm << logSuffix);
+        QPID_LOG(trace, logPrefix << "Dequeued " << qm);
         sys::Mutex::ScopedLock l(lock);
         dequeues.add(qm.position);
         // If we have not yet sent this message to the backup, then
@@ -415,8 +423,8 @@ void ReplicatingSubscription::dequeued(c
 void ReplicatingSubscription::sendPositionEvent(SequenceNumber pos, const 
sys::Mutex::ScopedLock&)
 {
     if (pos == backupPosition) return; // No need to send.
-    QPID_LOG(trace, logPrefix << "sending position " << pos
-             << ", was " << backupPosition << logSuffix);
+    QPID_LOG(trace, logPrefix << "Sending position " << pos
+             << ", was " << backupPosition);
     string buf(pos.encodedSize(),'\0');
     framing::Buffer buffer(&buf[0], buf.size());
     pos.encode(buffer);

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h?rev=1343351&r1=1343350&r2=1343351&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h Mon May 28 
18:24:43 2012
@@ -76,8 +76,9 @@ class ReplicatingSubscription : public b
     static const std::string QPID_REPLICATING_SUBSCRIPTION;
     static const std::string QPID_HIGH_SEQUENCE_NUMBER;
     static const std::string QPID_LOW_SEQUENCE_NUMBER;
+    static const std::string QPID_BROKER_INFO;
 
-    // FIXME aconway 2012-05-23: these don't belong on ReplicatingSubscription
+    // TODO aconway 2012-05-23: these don't belong on ReplicatingSubscription
     /** Get position of front message on queue.
      *@return false if queue is empty.
      */
@@ -88,7 +89,7 @@ class ReplicatingSubscription : public b
     static bool getNext(broker::Queue&, framing::SequenceNumber from,
                         framing::SequenceNumber& result);
 
-    ReplicatingSubscription(LogPrefix,
+    ReplicatingSubscription(HaBroker&,
                             broker::SemanticState* parent,
                             const std::string& name, 
boost::shared_ptr<broker::Queue> ,
                             bool ack, bool acquire, bool exclusive, const 
std::string& tag,
@@ -120,8 +121,8 @@ class ReplicatingSubscription : public b
   private:
     typedef std::map<framing::SequenceNumber, broker::QueuedMessage> Delayed;
 
+    HaBroker&  haBroker;
     LogPrefix logPrefix;
-    std::string logSuffix;
     boost::shared_ptr<broker::Queue> dummy; // Used to send event messages
     Delayed delayed;
     framing::SequenceSet dequeues;



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to