Author: aconway
Date: Mon May  4 18:38:18 2009
New Revision: 771392

URL: http://svn.apache.org/viewvc?rev=771392&view=rev
Log:
LatenchTracker: a tool for measuring latencies.

Added measurement points to cluster code.

Added:
    qpid/trunk/qpid/cpp/src/qpid/sys/LatencyTracker.h   (with props)
Modified:
    qpid/trunk/qpid/cpp/src/cluster.mk
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp

Modified: qpid/trunk/qpid/cpp/src/cluster.mk
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/cluster.mk?rev=771392&r1=771391&r2=771392&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/cluster.mk (original)
+++ qpid/trunk/qpid/cpp/src/cluster.mk Mon May  4 18:38:18 2009
@@ -78,7 +78,8 @@
   qpid/cluster/PollerDispatch.h                        \
   qpid/cluster/ProxyInputHandler.h             \
   qpid/cluster/Quorum.h                                \
-  qpid/cluster/types.h 
+  qpid/cluster/types.h                                 \
+  qpid/sys/LatencyTracker.h
 
 cluster_la_LIBADD=  -lcpg $(libcman) libqpidbroker.la libqpidclient.la
 cluster_la_LDFLAGS = $(PLUGINLDFLAGS)

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=771392&r1=771391&r2=771392&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Mon May  4 18:38:18 2009
@@ -48,6 +48,7 @@
 #include "qpid/memory.h"
 #include "qpid/shared_ptr.h"
 #include "qpid/sys/Thread.h"
+#include "qpid/sys/LatencyTracker.h"
 
 #include <boost/bind.hpp>
 #include <boost/cast.hpp>
@@ -196,8 +197,8 @@
     leave(l);
 }
 
-#define LEAVE_TRY(STMT) try { STMT; } \
-    catch (const std::exception& e) { \
+#define LEAVE_TRY(STMT) try { STMT; }                                   \
+    catch (const std::exception& e) {                                   \
         QPID_LOG(warning, *this << " error leaving cluster: " << e.what()); \
     } do {} while(0)
 
@@ -228,17 +229,20 @@
 }
 
 void Cluster::deliverEvent(const Event& e) {
+    LATENCY_START(Event, "enqueue event", e.getData());
     deliverEventQueue.push(e);
 }
 
 void Cluster::deliverFrame(const EventFrame& e) {
+    LATENCY_START(EventFrame, "enqueue frame", e.frame.getBody());
     deliverFrameQueue.push(e);
 }
 
 // Handler for deliverEventQueue.
 // This thread decodes frames from events.
 void Cluster::deliveredEvent(const Event& e) {
-        QPID_LOG(trace, *this << " DLVR: " << e);
+    LATENCY_STAGE(Event, "dequeue event", e.getData());
+    QPID_LOG(trace, *this << " DLVR: " << e);
     if (e.isCluster()) {
         EventFrame ef(e, e.getFrame());
         // Stop the deliverEventQueue on update offers.
@@ -253,9 +257,10 @@
             deliverFrame(EventFrame(e, e.getFrame()));
         else
             decoder.decode(e, e.getData());
-}
+    }
     else // Discard connection events if discarding is set.
         QPID_LOG(trace, *this << " DROP: " << e);
+    LATENCY_END(Event, "processed event", e.getData());
 }
 
 void Cluster::flagError(Connection& connection, ErrorCheck::ErrorType type) {
@@ -266,11 +271,13 @@
 // Handler for deliverFrameQueue.
 // This thread executes the main logic.
 void Cluster::deliveredFrame(const EventFrame& e) {
+    LATENCY_STAGE(EventFrame, "dequeued frame", e.frame.getBody());
     Mutex::ScopedLock l(lock);
     // Process each frame through the error checker.
     error.delivered(e);
     while (error.canProcess())  // There is a frame ready to process.
         processFrame(error.getNext(), l);
+    LATENCY_END(EventFrame, "processed frame", e.frame.getBody());
 }
 
 void Cluster::processFrame(const EventFrame& e, Lock& l) {
@@ -543,7 +550,7 @@
     Lock l(lock);
     QPID_LOG(debug, *this << " managementMethod [id=" << methodId << "]");
     switch (methodId) {
-    case _qmf::Cluster::METHOD_STOPCLUSTERNODE :
+      case _qmf::Cluster::METHOD_STOPCLUSTERNODE :
         {
             _qmf::ArgsClusterStopClusterNode& iargs = 
(_qmf::ArgsClusterStopClusterNode&) args;
             stringstream stream;
@@ -552,10 +559,10 @@
                 stopClusterNode(l);
         }
         break;
-    case _qmf::Cluster::METHOD_STOPFULLCLUSTER :
+      case _qmf::Cluster::METHOD_STOPFULLCLUSTER :
         stopFullCluster(l);
         break;
-    default:
+      default:
         return Manageable::STATUS_UNKNOWN_METHOD;
     }
     return Manageable::STATUS_OK;

Added: qpid/trunk/qpid/cpp/src/qpid/sys/LatencyTracker.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/LatencyTracker.h?rev=771392&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/LatencyTracker.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/LatencyTracker.h Mon May  4 18:38:18 2009
@@ -0,0 +1,104 @@
+#ifndef QPID_SYS_LATENCYTRACKER_H
+#define QPID_SYS_LATENCYTRACKER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Time.h"
+
+namespace qpid {
+namespace sys {
+
+/**
+ * Record latency between events in the lifecycle of an object.
+ * For testing/debugging purposes: use the macros to declare
+ * and #define QPID_LATENCY_TRACKER to enable in a build.
+ */
+template <class T> class LatencyTracker
+{
+  public:
+    static void start(const char* name, const void* p) {  
instance.doStart(name, p); }
+    static void stage(const char* name, const void* p) { 
instance.doStage(name, p); }
+    static void end(const char* name, const void* p) { instance.doEnd(name, 
p); }
+
+  private:
+    
+    LatencyTracker() : object(), times(), totals(), count(), names(), index(), 
maxIndex() { }
+    ~LatencyTracker() { print(); }
+
+    void doStart(const char* n, const void* p) { if (!object) { name(n); 
object=p; times[0] = now(); index = 1; } }
+    void doStage(const char* n, const void* p) { if (p == object) { name(n); 
times[index++] = now(); } }
+    void doEnd(const char* n, const void* p) { if (p == object) { name(n); 
times[index++] = now(); record(); object = 0; } }
+
+    void name(const char* n) {
+        if (names[index] == 0) names[index] = n;
+        assert(names[index] == n);
+    }
+ 
+    void record() {
+        if (maxIndex == 0) maxIndex = index;
+        assert(maxIndex == index);
+        for (int i = 0; i < index-1; ++i) 
+            totals[i] += Duration(times[i], times[i+1]);
+        ++count;
+    }
+    
+    void print() {
+        printf("\nLatency from %s (%lu samples, %d stages) :\n", names[0], 
count, maxIndex-1);
+        for (int i = 0; i < maxIndex-1; ++i) 
+            printf("to %s:\t%luus\n", names[i+1], (totals[i]/count)/TIME_USEC);
+    }
+
+    static const int SIZE = 1024;
+    const void* object;
+    AbsTime times[SIZE];
+    unsigned long totals[SIZE];
+    unsigned long count;
+    const char* names[SIZE];
+    int index, maxIndex;
+
+    static LatencyTracker instance;
+};
+
+template <class T> struct LatencyEndOnExit {
+    const char* name;
+    const void* ptr;
+    LatencyEndOnExit(const char* n, const void* p) : name(n), ptr(p) {}
+    ~LatencyEndOnExit() { LatencyTracker<T>::end(name, ptr); }
+};
+
+template <class T> LatencyTracker<T> LatencyTracker<T>::instance;
+
+#if defined(QPID_LATENCY_TRACKER)
+#define LATENCY_START(TAG, NAME, PTR) 
::qpid::sys::LatencyTracker<TAG>::start(NAME, PTR)
+#define LATENCY_STAGE(TAG, NAME, PTR) 
::qpid::sys::LatencyTracker<TAG>::stage(NAME, PTR)
+#define LATENCY_END(TAG, NAME, PTR) 
::qpid::sys::LatencyTracker<TAG>::end(NAME, PTR)
+#define LATENCY_END_ON_EXIT(TAG, NAME, PTR) 
::qpid::sys::LatencyEndOnExit<TAG>(NAME, PTR)
+#else
+#define LATENCY_START(TAG, NAME, PTR) void(0)
+#define LATENCY_STAGE(TAG, NAME, PTR) void(0)
+#define LATENCY_END(TAG, NAME, PTR) void(0)
+#define LATENCY_END_ON_EXIT(TAG, NAME, PTR) void(0) 
+#endif
+
+}} // namespace qpid::sys
+
+#endif  /*!QPID_SYS_LATENCYTRACKER_H*/

Propchange: qpid/trunk/qpid/cpp/src/qpid/sys/LatencyTracker.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/qpid/sys/LatencyTracker.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to