Author: aconway
Date: Thu Jan 19 23:04:44 2012
New Revision: 1233652

URL: http://svn.apache.org/viewvc?rev=1233652&view=rev
Log:
QPID-3603: Cleanup of HA log messages.

Added:
    qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Logging.cpp   (with props)
    qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Logging.h   (with props)
Modified:
    qpid/branches/qpid-3603-2/qpid/cpp/src/ha.mk
    qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.cpp
    qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaBroker.cpp
    qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaPlugin.cpp
    qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
    qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
    qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
    qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
    qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py

Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/ha.mk
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/ha.mk?rev=1233652&r1=1233651&r2=1233652&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/ha.mk (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/ha.mk Thu Jan 19 23:04:44 2012
@@ -28,6 +28,8 @@ ha_la_SOURCES =                                       \
   qpid/ha/HaBroker.cpp                         \
   qpid/ha/HaBroker.h                           \
   qpid/ha/HaPlugin.cpp                         \
+  qpid/ha/Logging.h                            \
+  qpid/ha/Logging.cpp                          \
   qpid/ha/Settings.h                           \
   qpid/ha/QueueReplicator.h                    \
   qpid/ha/QueueReplicator.cpp                  \

Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.cpp?rev=1233652&r1=1233651&r2=1233652&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.cpp (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Backup.cpp Thu Jan 19 
23:04:44 2012
@@ -46,7 +46,7 @@ Backup::Backup(broker::Broker& b, const 
     // FIXME aconway 2011-11-24: identifying the primary.
     if (s.brokerUrl != "primary") { // FIXME aconway 2011-11-22: temporary 
hack to identify primary.
         Url url(s.brokerUrl);
-        QPID_LOG(info, "HA: Acting as backup to " << url);
+        QPID_LOG(info, "HA: Acting as backup");
         string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol;
 
         // FIXME aconway 2011-11-17: TBD: link management, discovery, 
fail-over.

Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaBroker.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaBroker.cpp?rev=1233652&r1=1233651&r2=1233652&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaBroker.cpp (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaBroker.cpp Thu Jan 19 
23:04:44 2012
@@ -59,8 +59,8 @@ HaBroker::HaBroker(broker::Broker& b, co
         mgmtObject->set_status("solo");
         ma->addObject(mgmtObject);
     }
-    QPID_LOG(notice, "HA: broker initialized, client-url=" << clientUrl
-             << ", broker-url=" << brokerUrl);
+    QPID_LOG(notice, "HA: Initialized: client-url=" << clientUrl
+             << " broker-url=" << brokerUrl);
     backup.reset(new Backup(broker, s));
     // Register a factory for replicating subscriptions.
     broker.getConsumerFactories().add(

Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaPlugin.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaPlugin.cpp?rev=1233652&r1=1233651&r2=1233652&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaPlugin.cpp (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/HaPlugin.cpp Thu Jan 19 
23:04:44 2012
@@ -56,7 +56,6 @@ struct HaPlugin : public Plugin {
     void initialize(Plugin::Target& target) {
         broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
         if (broker && settings.enabled) {
-            QPID_LOG(info, "HA: Enabled");
             haBroker.reset(new ha::HaBroker(*broker, settings));
         } else
             QPID_LOG(info, "HA: Disabled");

Added: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Logging.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Logging.cpp?rev=1233652&view=auto
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Logging.cpp (added)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Logging.cpp Thu Jan 19 
23:04:44 2012
@@ -0,0 +1,36 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "Logging.h"
+#include "qpid/broker/QueuedMessage.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/framing/SequenceNumber.h"
+
+namespace qpid {
+namespace ha {
+
+QueuePos::QueuePos(const broker::QueuedMessage& qm)
+    : queue(qm.queue), position(qm.position) {}
+
+std::ostream& operator<<(std::ostream& o, const QueuePos& qp) {
+    return o << qp.queue->getName() << "[" << qp.position << "]";
+}
+
+}} // namesopace qpid::ha

Propchange: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Logging.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Logging.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Logging.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Logging.h?rev=1233652&view=auto
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Logging.h (added)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Logging.h Thu Jan 19 
23:04:44 2012
@@ -0,0 +1,55 @@
+#ifndef QPID_HA_HAOSTREAM_H
+#define QPID_HA_HAOSTREAM_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <iosfwd>
+
+/**@file ostream helpers used in log messages. */
+
+namespace qpid {
+
+namespace broker {
+class Queue;
+class QueuedMessage;
+}
+
+namespace framing {
+class SequenceNumber;
+}
+
+namespace ha {
+
+// Other printable helpers
+
+struct QueuePos {
+    const broker::Queue* queue;
+    const framing::SequenceNumber& position;
+    QueuePos(const broker::Queue* q, const framing::SequenceNumber& pos)
+        : queue(q), position(pos) {}
+    QueuePos(const broker::QueuedMessage& qm);
+};
+
+std::ostream& operator<<(std::ostream& o, const QueuePos& h);
+
+}} // namespace qpid::ha
+
+#endif  /*!QPID_HA_HAOSTREAM_H*/

Propchange: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Logging.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/Logging.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/QueueReplicator.cpp?rev=1233652&r1=1233651&r2=1233652&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/QueueReplicator.cpp 
(original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/QueueReplicator.cpp Thu Jan 
19 23:04:44 2012
@@ -21,6 +21,7 @@
 
 #include "QueueReplicator.h"
 #include "ReplicatingSubscription.h"
+#include "Logging.h"
 #include "qpid/broker/Bridge.h"
 #include "qpid/broker/Broker.h"
 #include "qpid/broker/Link.h"
@@ -82,7 +83,7 @@ void QueueReplicator::initializeBridge(B
     peer.getMessage().subscribe(args.i_src, args.i_dest, args.i_sync ? 0 : 1, 
0, false, "", 0, settings);
     peer.getMessage().flow(getName(), 0, 0xFFFFFFFF);
     peer.getMessage().flow(getName(), 1, 0xFFFFFFFF);
-    QPID_LOG(debug, "Activated route from queue " << args.i_src << " to " << 
args.i_dest);
+    QPID_LOG(debug, "HA: Activated route from queue " << args.i_src << " to " 
<< args.i_dest);
 }
 
 
@@ -105,14 +106,17 @@ void QueueReplicator::route(Deliverable&
             if (current < *i) {
                 //haven't got that far yet, record the dequeue
                 dequeued.add(*i);
-                QPID_LOG(debug, "Recording dequeue of message at " << *i << " 
from " << queue->getName());
+                QPID_LOG(trace, "HA: Recording dequeue of message at " <<
+                         QueuePos(queue.get(), *i));
             } else {
                 QueuedMessage message;
                 if (queue->acquireMessageAt(*i, message)) {
                     queue->dequeue(0, message);
-                    QPID_LOG(info, "Dequeued message at " << *i << " from " << 
queue->getName());
+                    QPID_LOG(info, "HA: Dequeued message "<< 
QueuePos(message));
                 } else {
-                    QPID_LOG(error, "Unable to dequeue message at " << *i << " 
from " << queue->getName());
+                    // FIXME aconway 2011-11-29: error handling
+                    QPID_LOG(error, "HA: Unable to dequeue message at "
+                             << QueuePos(queue.get(), *i));
                 }
             }
         }
@@ -121,10 +125,10 @@ void QueueReplicator::route(Deliverable&
         //dequeued before our subscription reached them
         while (dequeued.contains(++current)) {
             dequeued.remove(current);
-            QPID_LOG(debug, "Skipping dequeued message at " << current << " 
from " << queue->getName());
+            QPID_LOG(debug, "HA: Skipping dequeued message at " << current << 
" from " << queue->getName());
             queue->setPosition(current);
         }
-        QPID_LOG(info, "Enqueued message on " << queue->getName() << "; 
currently at " << current);
+        QPID_LOG(info, "HA: Enqueued message on " << queue->getName() << "; 
currently at " << current);
         msg.deliverTo(queue);
     }
 }

Modified: 
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp?rev=1233652&r1=1233651&r2=1233652&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp 
(original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp 
Thu Jan 19 23:04:44 2012
@@ -20,6 +20,7 @@
  */
 
 #include "ReplicatingSubscription.h"
+#include "Logging.h"
 #include "qpid/broker/Queue.h"
 #include "qpid/framing/AMQFrame.h"
 #include "qpid/framing/MessageTransferBody.h"
@@ -39,7 +40,7 @@ const string ReplicatingSubscription::QP
 const string 
ReplicatingSubscription::QPID_LOW_SEQUENCE_NUMBER("qpid.low_sequence_number");
 
 const string DOLLAR("$");
-const string INTERNAL("_internal");
+const string INTERNAL("-internal");
 
 class ReplicationStateInitialiser
 {
@@ -55,7 +56,7 @@ class ReplicationStateInitialiser
     void operator()(const QueuedMessage& message) {
         if (message.position < start) {
             //replica does not have a message that should still be on the queue
-            QPID_LOG(warning, "Replica appears to be missing message at " << 
message.position);
+            QPID_LOG(warning, "HA: Replica missing message " << 
QueuePos(message));
         } else if (message.position >= start && message.position <= end) {
             //i.e. message is within the intial range and has not been 
dequeued, so remove it from the results
             results.remove(message.position);
@@ -75,63 +76,63 @@ string mask(const string& in)
 
 boost::shared_ptr<broker::SemanticState::ConsumerImpl>
 ReplicatingSubscription::Factory::create(
-    SemanticState* _parent,
-    const string& _name,
-    Queue::shared_ptr _queue,
+    SemanticState* parent,
+    const string& name,
+    Queue::shared_ptr queue,
     bool ack,
-    bool _acquire,
-    bool _exclusive,
-    const string& _tag,
-    const string& _resumeId,
-    uint64_t _resumeTtl,
-    const framing::FieldTable& _arguments
+    bool acquire,
+    bool exclusive,
+    const string& tag,
+    const string& resumeId,
+    uint64_t resumeTtl,
+    const framing::FieldTable& arguments
 ) {
-
     return boost::shared_ptr<broker::SemanticState::ConsumerImpl>(
-        new ReplicatingSubscription(_parent, _name, _queue, ack, _acquire, 
_exclusive, _tag, _resumeId, _resumeTtl, _arguments));
+        new ReplicatingSubscription(parent, name, queue, ack, acquire, 
exclusive, tag, resumeId, resumeTtl, arguments));
 }
 
 ReplicatingSubscription::ReplicatingSubscription(
-    SemanticState* _parent,
-    const string& _name,
-    Queue::shared_ptr _queue,
+    SemanticState* parent,
+    const string& name,
+    Queue::shared_ptr queue,
     bool ack,
-    bool _acquire,
-    bool _exclusive,
-    const string& _tag,
-    const string& _resumeId,
-    uint64_t _resumeTtl,
-    const framing::FieldTable& _arguments
-) : ConsumerImpl(_parent, _name, _queue, ack, _acquire, _exclusive, _tag, 
_resumeId, _resumeTtl, _arguments),
-    events(new Queue(mask(_name))),
+    bool acquire,
+    bool exclusive,
+    const string& tag,
+    const string& resumeId,
+    uint64_t resumeTtl,
+    const framing::FieldTable& arguments
+) : ConsumerImpl(parent, name, queue, ack, acquire, exclusive, tag,
+                 resumeId, resumeTtl, arguments),
+    events(new Queue(mask(name))),
     consumer(new DelegatingConsumer(*this))
 {
+    QPID_LOG(debug, "HA: Replicating subscription " << name << " to " << 
queue->getName());
     // FIXME aconway 2011-11-25: string constants.
-    QPID_LOG(debug, "HA: replicating subscription " << _name << " to " << 
_queue->getName());
-    if (_arguments.isSet("qpid.high_sequence_number")) {
-        qpid::framing::SequenceNumber hwm = 
_arguments.getAsInt("qpid.high_sequence_number");
+    if (arguments.isSet("qpid.high_sequence_number")) {
+        qpid::framing::SequenceNumber hwm = 
arguments.getAsInt("qpid.high_sequence_number");
         qpid::framing::SequenceNumber lwm;
-        if (_arguments.isSet("qpid.low_sequence_number")) {
-            lwm = _arguments.getAsInt("qpid.low_sequence_number");
+        if (arguments.isSet("qpid.low_sequence_number")) {
+            lwm = arguments.getAsInt("qpid.low_sequence_number");
         } else {
             lwm = hwm;
         }
         qpid::framing::SequenceNumber oldest;
-        if (_queue->getOldest(oldest)) {
+        if (queue->getOldest(oldest)) {
             if (oldest >= hwm) {
                 range.add(lwm, --oldest);
             } else if (oldest >= lwm) {
                 ReplicationStateInitialiser initialiser(range, lwm, hwm);
-                _queue->eachMessage(initialiser);
+                queue->eachMessage(initialiser);
             } else { //i.e. have older message on master than is reported to 
exist on replica
-                QPID_LOG(warning, "Replica appears to be missing message on 
master");
+                QPID_LOG(warning, "HA: Replica  missing message on master");
             }
         } else {
             //local queue (i.e. master) is empty
-            range.add(lwm, _queue->getPosition());
+            range.add(lwm, queue->getPosition());
         }
-        QPID_LOG(debug, "Initial set of dequeues for " << _queue->getName() << 
" are " << range
-                 << " (lwm=" << lwm << ", hwm=" << hwm << ", current=" << 
_queue->getPosition() << ")");
+        QPID_LOG(debug, "HA: Initial set of dequeues for " << queue->getName() 
<< " are " << range
+                 << " (lwm=" << lwm << ", hwm=" << hwm << ", current=" << 
queue->getPosition() << ")");
         //set position of 'cursor'
         position = hwm;
     }
@@ -142,11 +143,6 @@ bool ReplicatingSubscription::deliver(Qu
     return ConsumerImpl::deliver(m);
 }
 
-void ReplicatingSubscription::init()
-{
-    
getQueue()->addObserver(boost::dynamic_pointer_cast<QueueObserver>(shared_from_this()));
-}
-
 void ReplicatingSubscription::cancel()
 {
     
getQueue()->removeObserver(boost::dynamic_pointer_cast<QueueObserver>(shared_from_this()));
@@ -158,10 +154,9 @@ ReplicatingSubscription::~ReplicatingSub
 //under the message lock in the queue
 void ReplicatingSubscription::enqueued(const QueuedMessage& m)
 {
-    QPID_LOG(debug, "Enqueued message at " << m.position);
+    QPID_LOG(trace, "HA: Enqueued message " << QueuePos(m));
     //delay completion
     m.payload->getIngressCompletion().startCompleter();
-    QPID_LOG(debug, "Delayed " << m.payload.get());
 }
 
 void ReplicatingSubscription::generateDequeueEvent()
@@ -203,12 +198,13 @@ void ReplicatingSubscription::dequeued(c
     {
         sys::Mutex::ScopedLock l(lock);
         range.add(m.position);
-        QPID_LOG(debug, "Updated dequeue event to include message at " << 
m.position << "; subscription is at " << position);
+        // FIXME aconway 2011-11-29: q[pos]
+        QPID_LOG(trace, "HA: Updated dequeue event to include " << QueuePos(m) 
<< "; subscription is at " << position);
     }
     notify();
     if (m.position > position) {
         m.payload->getIngressCompletion().finishCompleter();
-        QPID_LOG(debug, "Completed " << m.payload.get() << " early due to 
dequeue");
+        QPID_LOG(trace, "HA: Completed " << QueuePos(m) << " early due to 
dequeue");
     }
 }
 
@@ -237,5 +233,4 @@ bool ReplicatingSubscription::Delegating
 void ReplicatingSubscription::DelegatingConsumer::cancel() {}
 OwnershipToken* ReplicatingSubscription::DelegatingConsumer::getSession() { 
return delegate.getSession(); }
 
-
 }} // namespace qpid::broker

Modified: 
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h?rev=1233652&r1=1233651&r2=1233652&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h 
(original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h 
Thu Jan 19 23:04:44 2012
@@ -71,7 +71,6 @@ class ReplicatingSubscription : public b
 
     ~ReplicatingSubscription();
 
-    void init();
     void cancel();
     bool deliver(broker::QueuedMessage& msg);
     void enqueued(const broker::QueuedMessage&);

Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/WiringReplicator.cpp?rev=1233652&r1=1233651&r2=1233652&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/WiringReplicator.cpp 
(original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/WiringReplicator.cpp Thu Jan 
19 23:04:44 2012
@@ -213,7 +213,7 @@ void WiringReplicator::initializeBridge(
     sendQuery(QUEUE, queueName, sessionHandler);
     sendQuery(EXCHANGE, queueName, sessionHandler);
     sendQuery(BINDING, queueName, sessionHandler);
-    QPID_LOG(debug, "Activated wiring replicator")
+    QPID_LOG(debug, "HA: Activated wiring replicator")
 }
 
 void WiringReplicator::route(Deliverable& msg, const string& /*key*/, const 
framing::FieldTable* headers) {
@@ -227,10 +227,10 @@ void WiringReplicator::route(Deliverable
 
         if (headers->getAsString(QMF_CONTENT) == EVENT) {
             for (Variant::List::iterator i = list.begin(); i != list.end(); 
++i) {
-                Variant::Map& map = list.front().asMap();
+                Variant::Map& map = i->asMap();
                 Variant::Map& schema = map[SCHEMA_ID].asMap();
                 Variant::Map& values = map[VALUES].asMap();
-                QPID_LOG(trace, "HA: Configuration event from primary: " << 
values);
+                QPID_LOG(trace, "HA: Configuration event: schema=" << schema 
<< " values=" << values);
                 if      (match<EventQueueDeclare>(schema)) 
doEventQueueDeclare(values);
                 else if (match<EventQueueDelete>(schema)) 
doEventQueueDelete(values);
                 else if (match<EventExchangeDeclare>(schema)) 
doEventExchangeDeclare(values);
@@ -246,7 +246,7 @@ void WiringReplicator::route(Deliverable
                 Variant::Map& values = i->asMap()[VALUES].asMap();
                 framing::FieldTable args;
                 amqp_0_10::translate(values[ARGUMENTS].asMap(), args);
-                QPID_LOG(trace, "HA: Configuration response from primary: " << 
values);
+                QPID_LOG(trace, "HA: Configuration response type=" << type << 
" values=" << values);
                 if      (type == QUEUE) doResponseQueue(values);
                 else if (type == EXCHANGE) doResponseExchange(values);
                 else if (type == BINDING) doResponseBind(values);
@@ -284,6 +284,7 @@ void WiringReplicator::doEventQueueDecla
             // re-create from event.
             // Events are always up to date, whereas responses may be
             // out of date.
+            QPID_LOG(debug, "HA: New queue replica " << name);
             startQueueReplicator(result.first);
         } else {
             QPID_LOG(warning, "HA: Replicated queue " << name << " already 
exists");
@@ -309,7 +310,7 @@ void WiringReplicator::doEventExchangeDe
         string name = values[EXNAME].asString();
         framing::FieldTable args;
         amqp_0_10::translate(argsMap, args);
-        QPID_LOG(debug, "HA: Creating exchange from event " << name);
+        QPID_LOG(debug, "HA: New exchange replica " << name);
         if (!broker.createExchange(
                 name,
                 values[EXTYPE].asString(),
@@ -320,7 +321,7 @@ void WiringReplicator::doEventExchangeDe
                 values[RHOST].asString()).second) {
             // FIXME aconway 2011-11-22: should delete pre-exisitng exchange
             // and re-create from event. See comment in doEventQueueDeclare.
-            QPID_LOG(warning, "Replicated exchange " << name << " already 
exists");
+            QPID_LOG(warning, "HA: Replicated exchange " << name << " already 
exists");
         }
     }
 }
@@ -348,7 +349,7 @@ void WiringReplicator::doEventBind(Varia
             framing::FieldTable args;
             amqp_0_10::translate(values[ARGS].asMap(), args);
             string key = values[KEY].asString();
-            QPID_LOG(debug, "Replicated binding exchange=" << 
exchange->getName()
+            QPID_LOG(debug, "HA: Replicated binding exchange=" << 
exchange->getName()
                      << " queue=" << queue->getName()
                      << " key=" << key);
             exchange->bind(queue, key, &args);
@@ -363,7 +364,6 @@ void WiringReplicator::doResponseQueue(V
     framing::FieldTable args;
     amqp_0_10::translate(argsMap, args);
     string name(values[NAME].asString());
-    QPID_LOG(debug, "Creating replicated queue " << values[NAME].asString() << 
" (in catch-up)");
     std::pair<boost::shared_ptr<Queue>, bool> result =
         broker.createQueue(
             name,
@@ -375,11 +375,12 @@ void WiringReplicator::doResponseQueue(V
             ""/*TODO: who is the user?*/,
             ""/*TODO: what should we use as connection id?*/);
     if (result.second) {
+        QPID_LOG(debug, "HA: New queue replica: " << values[NAME] << " (in 
catch-up)");
         startQueueReplicator(result.first);
     } else {
         // FIXME aconway 2011-11-22: Normal to find queue already
         // exists if we're failing over.
-        QPID_LOG(warning, "Replicated queue " << values[NAME] << " already 
exists (in catch-up)");
+        QPID_LOG(warning, "HA: Replicated queue " << values[NAME] << " already 
exists (in catch-up)");
     }
 }
 
@@ -388,7 +389,7 @@ void WiringReplicator::doResponseExchang
     if (!replicateLevel(argsMap)) return;
     framing::FieldTable args;
     amqp_0_10::translate(argsMap, args);
-    QPID_LOG(debug, "Creating replicated exchange " << values[NAME].asString() 
<< " (in catch-up)");
+    QPID_LOG(debug, "HA: New exchange replica " << values[NAME] << " (in 
catch-up)");
     if (!broker.createExchange(
             values[NAME].asString(),
             values[TYPE].asString(),
@@ -397,7 +398,7 @@ void WiringReplicator::doResponseExchang
             args,
             ""/*TODO: who is the user?*/,
             ""/*TODO: what should we use as connection id?*/).second) {
-        QPID_LOG(warning, "Replicated exchange " << values[QNAME] << " already 
exists (in catch-up)");
+        QPID_LOG(warning, "HA: Replicated exchange " << values[QNAME] << " 
already exists (in catch-up)");
     }
 }
 

Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py?rev=1233652&r1=1233651&r2=1233652&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py Thu Jan 19 
23:04:44 2012
@@ -39,10 +39,6 @@ class ShortTests(BrokerTest):
                                   ] + args,
                       **kwargs)
 
-    def setup_wiring(self, primary, backup):
-        cmd="qpid-route route add %s %s qpid.node-cloner x"%(backup, primary)
-        self.assertEqual(0, os.system(cmd))
-
     # FIXME aconway 2011-11-15: work around async replication.
     def wait(self, session, address):
         def check():



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to