Author: aconway
Date: Wed May 14 14:45:45 2014
New Revision: 1594587

URL: http://svn.apache.org/r1594587
Log:
QPID-DISPATCH-16: Added per-message logging for messages sent/received by the 
router.

Modified:
    qpid/dispatch/trunk/include/qpid/dispatch/log.h
    qpid/dispatch/trunk/include/qpid/dispatch/message.h
    qpid/dispatch/trunk/src/alloc.c
    qpid/dispatch/trunk/src/dispatch.c
    qpid/dispatch/trunk/src/log.c
    qpid/dispatch/trunk/src/message.c
    qpid/dispatch/trunk/src/message_private.h
    qpid/dispatch/trunk/tests/system_tests_broker.py

Modified: qpid/dispatch/trunk/include/qpid/dispatch/log.h
URL: 
http://svn.apache.org/viewvc/qpid/dispatch/trunk/include/qpid/dispatch/log.h?rev=1594587&r1=1594586&r2=1594587&view=diff
==============================================================================
--- qpid/dispatch/trunk/include/qpid/dispatch/log.h (original)
+++ qpid/dispatch/trunk/include/qpid/dispatch/log.h Wed May 14 14:45:45 2014
@@ -50,7 +50,7 @@ void qd_log_impl(qd_log_source_t *source
 #define qd_log(s, c, f, ...) \
     do { if (qd_log_enabled(s,c)) qd_log_impl(s, c, __FILE__, __LINE__, f , 
##__VA_ARGS__); } while(0)
 
-/** Set the mask to enable log levels, see qd_log_level_t */
-void qd_log_set_mask(int mask);
+/** Maximum length for a log message */
+int qd_log_max_len();
 
 #endif

Modified: qpid/dispatch/trunk/include/qpid/dispatch/message.h
URL: 
http://svn.apache.org/viewvc/qpid/dispatch/trunk/include/qpid/dispatch/message.h?rev=1594587&r1=1594586&r2=1594587&view=diff
==============================================================================
--- qpid/dispatch/trunk/include/qpid/dispatch/message.h (original)
+++ qpid/dispatch/trunk/include/qpid/dispatch/message.h Wed May 14 14:45:45 2014
@@ -194,6 +194,13 @@ void qd_message_compose_1(qd_message_t *
 void qd_message_compose_2(qd_message_t *msg, qd_composed_field_t *content);
 void qd_message_compose_3(qd_message_t *msg, qd_composed_field_t *content1, 
qd_composed_field_t *content2);
 
+/** Put string representation of a message suitable for logging in buffer.
+ * @return buffer
+ */
+char* qd_message_repr(qd_message_t *msg, char *buffer, size_t len);
+/** Recommended buffer length for qd_message_repr */
+int qd_message_repr_len();
+
 ///@}
 
 #endif

Modified: qpid/dispatch/trunk/src/alloc.c
URL: 
http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/alloc.c?rev=1594587&r1=1594586&r2=1594587&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/alloc.c (original)
+++ qpid/dispatch/trunk/src/alloc.c Wed May 14 14:45:45 2014
@@ -188,6 +188,7 @@ void *qd_alloc(qd_alloc_type_desc_t *des
 
 void qd_dealloc(qd_alloc_type_desc_t *desc, qd_alloc_pool_t **tpool, void *p)
 {
+    if (!p) return;
     qd_alloc_item_t *item = ((qd_alloc_item_t*) p) - 1;
     int              idx;
 

Modified: qpid/dispatch/trunk/src/dispatch.c
URL: 
http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/dispatch.c?rev=1594587&r1=1594586&r2=1594587&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/dispatch.c (original)
+++ qpid/dispatch/trunk/src/dispatch.c Wed May 14 14:45:45 2014
@@ -26,6 +26,7 @@
 #include "log_private.h"
 #include "router_private.h"
 #include "waypoint_private.h"
+#include "message_private.h"
 
 /**
  * Private Function Prototypes
@@ -63,6 +64,7 @@ qd_dispatch_t *qd_dispatch(const char *p
 
     qd_python_initialize(qd, python_pkgdir);
     qd_config_initialize();
+    qd_message_initialize();
     qd->config = qd_config();
 
     return qd;

Modified: qpid/dispatch/trunk/src/log.c
URL: 
http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/log.c?rev=1594587&r1=1594586&r2=1594587&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/log.c (original)
+++ qpid/dispatch/trunk/src/log.c Wed May 14 14:45:45 2014
@@ -32,6 +32,8 @@
 #define LIST_MAX 1000
 #define LOG_MAX 640
 
+int qd_log_max_len() { return TEXT_MAX; }
+
 typedef struct qd_log_entry_t qd_log_entry_t;
 
 struct qd_log_entry_t {

Modified: qpid/dispatch/trunk/src/message.c
URL: 
http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/message.c?rev=1594587&r1=1594586&r2=1594587&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/message.c (original)
+++ qpid/dispatch/trunk/src/message.c Wed May 14 14:45:45 2014
@@ -20,6 +20,8 @@
 #include <qpid/dispatch/ctools.h>
 #include <qpid/dispatch/amqp.h>
 #include <qpid/dispatch/threading.h>
+#include <qpid/dispatch/iterator.h>
+#include <qpid/dispatch/log.h>
 #include "message_private.h"
 #include "compose_private.h"
 #include <string.h>
@@ -53,6 +55,24 @@ ALLOC_DEFINE(qd_message_content_t);
 
 typedef void (*buffer_process_t) (void *context, const unsigned char *base, 
int length);
 
+static qd_log_source_t* log_source = 0;
+
+void qd_message_initialize() {
+    log_source = qd_log_source("MESSAGE");
+}
+
+int qd_message_repr_len() { return qd_log_max_len(); }
+
+/* TODO aconway 2014-05-13: more detailed message representation. */
+char* qd_message_repr(qd_message_t *msg, char* buffer, size_t len) {
+    qd_field_iterator_t* iter = qd_message_field_iterator((qd_message_t*)msg, 
QD_FIELD_TO);
+    char* to = iter? (char*)qd_field_iterator_copy(iter) : 0;
+    qd_field_iterator_free(iter);
+    snprintf(buffer, len, "Message(%p){to=%s}", msg, to? to:"");
+    free(to);
+    return buffer;
+}
+
 static void advance(unsigned char **cursor, qd_buffer_t **buffer, int consume, 
buffer_process_t handler, void *context)
 {
     unsigned char *local_cursor = *cursor;
@@ -581,7 +601,13 @@ qd_message_t *qd_message_receive(qd_deli
                 qd_buffer_free(buf);
             }
             qd_delivery_set_context(delivery, 0);
-            return (qd_message_t*) msg;
+
+           char repr[qd_message_repr_len()];
+           qd_log(log_source, QD_LOG_TRACE, "%s received, link=%s",
+                  qd_message_repr((qd_message_t*)msg, repr, sizeof(repr)),
+                  pn_link_name(link));
+
+           return (qd_message_t*) msg;
         }
 
         if (rc > 0) {
@@ -627,6 +653,11 @@ void qd_message_send(qd_message_t *in_ms
     unsigned char        *cursor;
     pn_link_t            *pnl     = qd_link_pn(link);
 
+    char repr[qd_message_repr_len()];
+    qd_log(log_source, QD_LOG_TRACE, "%s sending, link=%s",
+          qd_message_repr(in_msg, repr, sizeof(repr)),
+          pn_link_name(pnl));
+
     if (DEQ_SIZE(content->new_delivery_annotations) > 0) {
         //
         // This is the case where the delivery annotations have been modified.
@@ -812,6 +843,9 @@ int qd_message_check(qd_message_t *in_ms
     result = qd_message_check_LH(content, depth);
     sys_mutex_unlock(content->lock);
 
+    char repr[qd_message_repr_len()];
+    qd_log(log_source, QD_LOG_TRACE, "%s check",
+          qd_message_repr(in_msg, repr, sizeof(repr)));
     return result;
 }
 

Modified: qpid/dispatch/trunk/src/message_private.h
URL: 
http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/message_private.h?rev=1594587&r1=1594586&r2=1594587&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/message_private.h (original)
+++ qpid/dispatch/trunk/src/message_private.h Wed May 14 14:45:45 2014
@@ -100,6 +100,9 @@ ALLOC_DECLARE(qd_message_content_t);
 
 #define MSG_CONTENT(m) (((qd_message_pvt_t*) m)->content)
 
+/** Initialize logging */
+void qd_message_initialize();
+
 ///@}
 
 #endif

Modified: qpid/dispatch/trunk/tests/system_tests_broker.py
URL: 
http://svn.apache.org/viewvc/qpid/dispatch/trunk/tests/system_tests_broker.py?rev=1594587&r1=1594586&r2=1594587&view=diff
==============================================================================
--- qpid/dispatch/trunk/tests/system_tests_broker.py (original)
+++ qpid/dispatch/trunk/tests/system_tests_broker.py Wed May 14 14:45:45 2014
@@ -48,7 +48,9 @@ class BrokerSystemTest(TestCase):
 
         # Start a qdrouterd
         router_conf = Qdrouterd.Config([
-            ('log', { 'module':'DEFAULT', 'level':'TRACE' }),
+            ('log', { 'module':'DEFAULT', 'level':'NOTICE' }),
+            ('log', { 'module':'ROUTER', 'level':'TRACE' }),
+            ('log', { 'module':'MESSAGE', 'level':'TRACE' }),
             ('container', {'container-name':self.id()}),
             ('container', {'container-name':self.id()}),
             ('router', { 'mode': 'standalone', 'router-id': self.id() }),
@@ -62,23 +64,24 @@ class BrokerSystemTest(TestCase):
 
         # Wait for broker & router to be ready
         wait_ports([q.port for q in qpidd] + router.ports)
+        qpidd[0].agent.addQueue(testq)
 
-        # Smoke test for qpidd
-        qc = self.cleanup(qm.Connection.establish(qpidd[0].address))
-        qc.session().sender(testq+";{create:always}").send("a")
-        qr = qc.session().receiver(testq)
-        self.assertEqual(qr.fetch(1).content, "a")
-
-        # Smoke test for dispatch.
-        addr = router.addresses[0]+"/xxx/1"
-        m1, m2 = self.messenger(), self.messenger()
-        m2.subscribe(addr)
-        m1.put(self.message(address=addr, body="b"))
-        m1.send()
+        # Test for waypoint routing via queue
+        m=self.message(address=router.addresses[0]+"/"+testq, body="c")
+        msgr = self.messenger()
+        time.sleep(3)           # FIXME aconway 2014-05-07: race on router
+        msgr.subscribe(m.address)
+        time.sleep(3)           # FIXME aconway 2014-05-07: race on router
+        msgr.put(m)
+        msgr.send()
         msg = Message()
-        m2.recv(1)
-        m2.get(msg)
-        self.assertEqual(msg.body, "b")
+        msgr.recv(1)
+        msgr.get(msg)
+        msgr.accept()
+        self.assertEqual(msg.body, m.body)
+        aq = qpidd[0].agent.getQueue(testq)
+        aq.update()
+        self.assertEquals((aq.msgTotalEnqueues, aq.msgTotalDequeues), (1,1))
 
         # FIXME aconway 2014-05-05: test for waypoint routing via queue
 



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to