Author: kgiusti
Date: Fri Mar  2 20:51:19 2012
New Revision: 1296448

URL: http://svn.apache.org/viewvc?rev=1296448&view=rev
Log:
QPID-3875: allow direct access to per-thread statistics

Modified:
    qpid/trunk/qpid/cpp/managementgen/qmfgen/templates/Class.h
    qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h

Modified: qpid/trunk/qpid/cpp/managementgen/qmfgen/templates/Class.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/managementgen/qmfgen/templates/Class.h?rev=1296448&r1=1296447&r2=1296448&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/managementgen/qmfgen/templates/Class.h (original)
+++ qpid/trunk/qpid/cpp/managementgen/qmfgen/templates/Class.h Fri Mar  2 
20:51:19 2012
@@ -53,9 +53,12 @@ QPID_BROKER_CLASS_EXTERN class /*MGEN:Cl
 /*MGEN:Class.InstDeclarations*/
 /*MGEN:IF(Class.ExistPerThreadStats)*/
     // Per-Thread Statistics
+
+ public:    
     struct PerThreadStats {
 /*MGEN:Class.PerThreadDeclarations*/
     };
+ private:
 
     struct PerThreadStats** perThreadStatsArray;
 
@@ -120,6 +123,11 @@ QPID_BROKER_CLASS_EXTERN class /*MGEN:Cl
 /*MGEN:Class.MethodIdDeclarations*/
     // Accessor Methods
 /*MGEN:Class.AccessorMethods*/
+
+/*MGEN:IF(Class.ExistPerThreadStats)*/
+    struct PerThreadStats* getStatistics() { return getThreadStats(); }
+    void statisticsUpdated() { instChanged = true; }
+/*MGEN:ENDIF*/
 };
 
 }/*MGEN:Class.CloseNamespaces*/

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?rev=1296448&r1=1296447&r2=1296448&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Fri Mar  2 20:51:19 2012
@@ -185,11 +185,13 @@ void Connection::recordFromServer(const 
     // Don't record management stats in cluster-unsafe contexts
     if (mgmtObject != 0 && isClusterSafe())
     {
-        mgmtObject->inc_framesToClient();
-        mgmtObject->inc_bytesToClient(frame.encodedSize());
-        if (isMessage(frame.getMethod())) {
-            mgmtObject->inc_msgsToClient();
-        }
+      qmf::org::apache::qpid::broker::Connection::PerThreadStats *cStats = 
mgmtObject->getStatistics();
+      cStats->framesToClient += 1;
+      cStats->bytesToClient += frame.encodedSize();
+      if (isMessage(frame.getMethod())) {
+       cStats->msgsToClient += 1;
+      }
+      mgmtObject->statisticsUpdated();
     }
 }
 
@@ -198,11 +200,13 @@ void Connection::recordFromClient(const 
     // Don't record management stats in cluster-unsafe contexts
     if (mgmtObject != 0 && isClusterSafe())
     {
-        mgmtObject->inc_framesFromClient();
-        mgmtObject->inc_bytesFromClient(frame.encodedSize());
-        if (isMessage(frame.getMethod())) {
-            mgmtObject->inc_msgsFromClient();
-        }
+      qmf::org::apache::qpid::broker::Connection::PerThreadStats *cStats = 
mgmtObject->getStatistics();
+      cStats->framesFromClient += 1;
+      cStats->bytesFromClient += frame.encodedSize();
+      if (isMessage(frame.getMethod())) {
+       cStats->msgsFromClient += 1;
+      }
+      mgmtObject->statisticsUpdated();
     }
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp?rev=1296448&r1=1296447&r2=1296448&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp Fri Mar  2 20:51:19 2012
@@ -135,20 +135,23 @@ void Exchange::doRoute(Deliverable& msg,
 
     if (mgmtExchange != 0)
     {
-        mgmtExchange->inc_msgReceives  ();
-        mgmtExchange->inc_byteReceives (msg.contentSize ());
-        if (count == 0)
+      qmf::org::apache::qpid::broker::Exchange::PerThreadStats *eStats = 
mgmtExchange->getStatistics();
+      uint64_t contentSize = msg.contentSize();
+
+      eStats->msgReceives += 1;
+      eStats->byteReceives += contentSize;
+      if (count == 0)
         {
-            //QPID_LOG(warning, "Exchange " << getName() << " could not route 
message; no matching binding found");
-            mgmtExchange->inc_msgDrops  ();
-            mgmtExchange->inc_byteDrops (msg.contentSize ());
-            if (brokerMgmtObject)
-                brokerMgmtObject->inc_discardsNoRoute();
+         //QPID_LOG(warning, "Exchange " << getName() << " could not route 
message; no matching binding found");
+         eStats->msgDrops += 1;
+         eStats->byteDrops += contentSize;
+         if (brokerMgmtObject)
+           brokerMgmtObject->inc_discardsNoRoute();
         }
-        else
+      else
         {
-            mgmtExchange->inc_msgRoutes  (count);
-            mgmtExchange->inc_byteRoutes (count * msg.contentSize ());
+         eStats->msgRoutes += count;
+         eStats->byteRoutes += count * contentSize;
         }
     }
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1296448&r1=1296447&r2=1296448&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Fri Mar  2 20:51:19 2012
@@ -88,8 +88,57 @@ const std::string qpidInsertSequenceNumb
 
 const int ENQUEUE_ONLY=1;
 const int ENQUEUE_AND_DEQUEUE=2;
+
+inline void mgntEnqStats(const boost::intrusive_ptr<Message>& msg,
+                        _qmf::Queue* mgmtObject,
+                        _qmf::Broker* brokerMgmtObject)
+{
+  if (mgmtObject != 0) {
+    qmf::org::apache::qpid::broker::Queue::PerThreadStats *qStats = 
mgmtObject->getStatistics();
+    qmf::org::apache::qpid::broker::Broker::PerThreadStats *bStats = 
brokerMgmtObject->getStatistics();
+
+    uint64_t contentSize = msg->contentSize();
+    qStats->msgTotalEnqueues +=1;
+    bStats->msgTotalEnqueues += 1;
+    qStats->byteTotalEnqueues += contentSize;
+    bStats->byteTotalEnqueues += contentSize;
+    if (msg->isPersistent ()) {
+      qStats->msgPersistEnqueues += 1;
+      bStats->msgPersistEnqueues += 1;
+      qStats->bytePersistEnqueues += contentSize;
+      bStats->bytePersistEnqueues += contentSize;
+    }
+    mgmtObject->statisticsUpdated();
+    brokerMgmtObject->statisticsUpdated();
+  }
+}
+
+inline void mgntDeqStats(const boost::intrusive_ptr<Message>& msg,
+                        _qmf::Queue* mgmtObject,
+                        _qmf::Broker* brokerMgmtObject)
+{
+  if (mgmtObject != 0){
+    qmf::org::apache::qpid::broker::Queue::PerThreadStats *qStats = 
mgmtObject->getStatistics();
+    qmf::org::apache::qpid::broker::Broker::PerThreadStats *bStats = 
brokerMgmtObject->getStatistics();
+    uint64_t contentSize = msg->contentSize();
+
+    qStats->msgTotalDequeues += 1;
+    bStats->msgTotalDequeues += 1;
+    qStats->byteTotalDequeues += contentSize;
+    bStats->byteTotalDequeues += contentSize;
+    if (msg->isPersistent ()){
+      qStats->msgPersistDequeues += 1;
+      bStats->msgPersistDequeues += 1;
+      qStats->bytePersistDequeues += contentSize;
+      bStats->bytePersistDequeues += contentSize;
+    }
+    mgmtObject->statisticsUpdated();
+    brokerMgmtObject->statisticsUpdated();
+  }
 }
 
+} // namespace
+
 Queue::Queue(const string& _name, bool _autodelete,
              MessageStore* const _store,
              const OwnershipToken* const _owner,
@@ -238,7 +287,7 @@ void Queue::requeue(const QueuedMessage&
                 if (brokerMgmtObject)
                     brokerMgmtObject->inc_abandoned();
             }
-            mgntDeqStats(msg.payload);
+            mgntDeqStats(msg.payload, mgmtObject, brokerMgmtObject);
         } else {
             messages->release(msg);
             listeners.populate(copy);
@@ -951,7 +1000,7 @@ bool Queue::popAndDequeue(QueuedMessage&
  */
 void Queue::observeDequeue(const QueuedMessage& msg, const Mutex::ScopedLock&)
 {
-    mgntDeqStats(msg.payload);
+    mgntDeqStats(msg.payload, mgmtObject, brokerMgmtObject);
     if (policy.get()) policy->dequeued(msg);
     messages->deleted(msg);
     for (Observers::const_iterator i = observers.begin(); i != 
observers.end(); ++i) {
@@ -1512,7 +1561,7 @@ void Queue::observeEnqueue(const QueuedM
     if (policy.get()) {
         policy->enqueued(m);
     }
-    mgntEnqStats(m.payload);
+    mgntEnqStats(m.payload, mgmtObject, brokerMgmtObject);
 }
 
 void Queue::updateEnqueued(const QueuedMessage& m)

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=1296448&r1=1296447&r2=1296448&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Fri Mar  2 20:51:19 2012
@@ -157,38 +157,6 @@ class Queue : public boost::enable_share
     void forcePersistent(QueuedMessage& msg);
     int getEventMode();
     void configureImpl(const qpid::framing::FieldTable& settings);
-
-    inline void mgntEnqStats(const boost::intrusive_ptr<Message>& msg)
-    {
-        if (mgmtObject != 0) {
-            mgmtObject->inc_msgTotalEnqueues ();
-            mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
-            brokerMgmtObject->inc_msgTotalEnqueues ();
-            brokerMgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
-            if (msg->isPersistent ()) {
-                mgmtObject->inc_msgPersistEnqueues ();
-                mgmtObject->inc_bytePersistEnqueues (msg->contentSize ());
-                brokerMgmtObject->inc_msgPersistEnqueues ();
-                brokerMgmtObject->inc_bytePersistEnqueues (msg->contentSize 
());
-            }
-        }
-    }
-    inline void mgntDeqStats(const boost::intrusive_ptr<Message>& msg)
-    {
-        if (mgmtObject != 0){
-            mgmtObject->inc_msgTotalDequeues  ();
-            mgmtObject->inc_byteTotalDequeues (msg->contentSize());
-            brokerMgmtObject->inc_msgTotalDequeues  ();
-            brokerMgmtObject->inc_byteTotalDequeues (msg->contentSize());
-            if (msg->isPersistent ()){
-                mgmtObject->inc_msgPersistDequeues ();
-                mgmtObject->inc_bytePersistDequeues (msg->contentSize());
-                brokerMgmtObject->inc_msgPersistDequeues ();
-                brokerMgmtObject->inc_bytePersistDequeues (msg->contentSize());
-            }
-        }
-    }
-
     void checkNotDeleted(const Consumer::shared_ptr& c);
     void notifyDeleted();
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to