Author: aconway
Date: Thu Jan 19 23:08:02 2012
New Revision: 1233678

URL: http://svn.apache.org/viewvc?rev=1233678&view=rev
Log:
QPID-3603: HA logging improvements.

Modified:
    qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Link.cpp
    qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
    qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
    qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h

Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Link.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Link.cpp?rev=1233678&r1=1233677&r2=1233678&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Link.cpp (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/broker/Link.cpp Thu Jan 19 
23:08:02 2012
@@ -249,7 +249,6 @@ void Link::ioThreadProcessing()
 
     if (state != STATE_OPERATIONAL)
         return;
-    QPID_LOG(debug, "Link::ioThreadProcessing()");
 
     // check for bridge session errors and recover
     if (!active.empty()) {

Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/QueueReplicator.cpp?rev=1233678&r1=1233677&r2=1233678&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/QueueReplicator.cpp 
(original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/QueueReplicator.cpp Thu Jan 
19 23:08:02 2012
@@ -55,7 +55,7 @@ QueueReplicator::QueueReplicator(boost::
     : Exchange(replicatorName(q->getName()), 0, q->getBroker()), queue(q), 
link(l)
 {
     std::stringstream ss;
-    ss << "HA: Backup queue " << queue->getName() << ": ";
+    ss << "HA: Backup " << queue->getName() << ": ";
     logPrefix = ss.str();
     QPID_LOG(info, logPrefix << "Created, settings: " << q->getSettings());
 }
@@ -133,12 +133,10 @@ template <class T> T decodeContent(Messa
 
 void QueueReplicator::dequeue(SequenceNumber n,  const 
sys::Mutex::ScopedLock&) {
     // Thread safe: only calls thread safe Queue functions.
-    if (queue->getPosition() >= n) { // Ignore dequeus we  haven't reached yet
+    if (queue->getPosition() >= n) { // Ignore messages we  haven't reached yet
         QueuedMessage message;
-        if (queue->acquireMessageAt(n, message)) {
+        if (queue->acquireMessageAt(n, message))
             queue->dequeue(0, message);
-            QPID_LOG(trace, logPrefix << "Dequeued message "<< 
message.position);
-        }
     }
 }
 
@@ -148,13 +146,13 @@ void QueueReplicator::route(Deliverable&
     sys::Mutex::ScopedLock l(lock);
     if (key == DEQUEUE_EVENT_KEY) {
         SequenceSet dequeues = decodeContent<SequenceSet>(msg.getMessage());
-        QPID_LOG(trace, logPrefix << "Dequeue update: " << dequeues);
+        QPID_LOG(trace, logPrefix << "Dequeue: " << dequeues);
         //TODO: should be able to optimise the following
         for (SequenceSet::iterator i = dequeues.begin(); i != dequeues.end(); 
i++)
             dequeue(*i, l);
     } else if (key == POSITION_EVENT_KEY) {
         SequenceNumber position = 
decodeContent<SequenceNumber>(msg.getMessage());
-        QPID_LOG(trace, logPrefix << "Position update: from " << 
queue->getPosition()
+        QPID_LOG(trace, logPrefix << "Position moved from " << 
queue->getPosition()
                  << " to " << position);
         assert(queue->getPosition() <= position);
          //TODO aconway 2011-12-14: Optimize this?

Modified: 
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp?rev=1233678&r1=1233677&r2=1233678&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp 
(original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp 
Thu Jan 19 23:08:02 2012
@@ -47,6 +47,7 @@ string mask(const string& in)
     return DOLLAR + in + INTERNAL;
 }
 
+/* Called by SemanticState::consume to create a consumer */
 boost::shared_ptr<broker::SemanticState::ConsumerImpl>
 ReplicatingSubscription::Factory::create(
     SemanticState* parent,
@@ -122,8 +123,6 @@ bool ReplicatingSubscription::deliver(Qu
             SequenceNumber send(m.position);
             --send;   // Send the position before m was enqueued.
             sendPositionEvent(send, l);
-            QPID_LOG(trace, logPrefix << "Sending position " << send
-                     << ", was " << backupPosition);
         }
         backupPosition = m.position;
         QPID_LOG(trace, logPrefix << "Replicating message " << m.position);
@@ -137,13 +136,14 @@ ReplicatingSubscription::~ReplicatingSub
 void ReplicatingSubscription::cancel()
 {
     QPID_LOG(debug, logPrefix <<"Cancelled backup subscription " << getName());
-    
getQueue()->removeObserver(boost::dynamic_pointer_cast<QueueObserver>(shared_from_this()));
+    getQueue()->removeObserver(
+        boost::dynamic_pointer_cast<QueueObserver>(shared_from_this()));
+    ConsumerImpl::cancel();
 }
 
 // Called before we get notified of the message being available and
 // under the message lock in the queue. Called in arbitrary connection thread.
-void ReplicatingSubscription::enqueued(const QueuedMessage& m)
-{
+void ReplicatingSubscription::enqueued(const QueuedMessage& m) {
     //delay completion
     m.payload->getIngressCompletion().startCompleter();
 }
@@ -164,6 +164,8 @@ void ReplicatingSubscription::sendDequeu
 void ReplicatingSubscription::sendPositionEvent(
     SequenceNumber position, const sys::Mutex::ScopedLock&l )
 {
+    QPID_LOG(trace, logPrefix << "Sending position " << position
+             << ", was " << backupPosition);
     string buf(backupPosition.encodedSize(),'\0');
     framing::Buffer buffer(&buf[0], buf.size());
     position.encode(buffer);
@@ -207,7 +209,6 @@ void ReplicatingSubscription::sendEvent(
 // the messageLock in the queue. Called in arbitrary connection threads.
 void ReplicatingSubscription::dequeued(const QueuedMessage& m)
 {
-    QPID_LOG(trace, logPrefix << "Dequeued message " << m.position);
     {
         sys::Mutex::ScopedLock l(lock);
         dequeues.add(m.position);
@@ -219,8 +220,10 @@ void ReplicatingSubscription::dequeued(c
         // not under the message lock?
         if (m.position > position) {
             m.payload->getIngressCompletion().finishCompleter();
-            QPID_LOG(trace, logPrefix << "Completed message " << m.position << 
" early");
+            QPID_LOG(trace, logPrefix << "Dequeued and completed message " << 
m.position << " early");
         }
+        else
+            QPID_LOG(trace, logPrefix << "Dequeued message " << m.position);
     }
     notify();                   // Ensure a call to doDispatch
 }

Modified: 
qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h?rev=1233678&r1=1233677&r2=1233678&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h 
(original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h 
Thu Jan 19 23:08:02 2012
@@ -76,13 +76,15 @@ class ReplicatingSubscription : public b
 
     ~ReplicatingSubscription();
 
-    void cancel();
+    // QueueObserver overrides.
     bool deliver(broker::QueuedMessage& msg);
     void enqueued(const broker::QueuedMessage&);
     void dequeued(const broker::QueuedMessage&);
     void acquired(const broker::QueuedMessage&) {}
     void requeued(const broker::QueuedMessage&) {}
 
+    // Consumer overrides.
+    void cancel();
     bool isDelayedCompletion() const { return true; }
 
   protected:



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to