Author: aconway
Date: Wed May 14 14:45:45 2014
New Revision: 1594587
URL: http://svn.apache.org/r1594587
Log:
QPID-DISPATCH-16: Added per-message logging for messages sent/received by the
router.
Modified:
qpid/dispatch/trunk/include/qpid/dispatch/log.h
qpid/dispatch/trunk/include/qpid/dispatch/message.h
qpid/dispatch/trunk/src/alloc.c
qpid/dispatch/trunk/src/dispatch.c
qpid/dispatch/trunk/src/log.c
qpid/dispatch/trunk/src/message.c
qpid/dispatch/trunk/src/message_private.h
qpid/dispatch/trunk/tests/system_tests_broker.py
Modified: qpid/dispatch/trunk/include/qpid/dispatch/log.h
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/include/qpid/dispatch/log.h?rev=1594587&r1=1594586&r2=1594587&view=diff
==============================================================================
--- qpid/dispatch/trunk/include/qpid/dispatch/log.h (original)
+++ qpid/dispatch/trunk/include/qpid/dispatch/log.h Wed May 14 14:45:45 2014
@@ -50,7 +50,7 @@ void qd_log_impl(qd_log_source_t *source
#define qd_log(s, c, f, ...) \
do { if (qd_log_enabled(s,c)) qd_log_impl(s, c, __FILE__, __LINE__, f ,
##__VA_ARGS__); } while(0)
-/** Set the mask to enable log levels, see qd_log_level_t */
-void qd_log_set_mask(int mask);
+/** Maximum length for a log message */
+int qd_log_max_len();
#endif
Modified: qpid/dispatch/trunk/include/qpid/dispatch/message.h
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/include/qpid/dispatch/message.h?rev=1594587&r1=1594586&r2=1594587&view=diff
==============================================================================
--- qpid/dispatch/trunk/include/qpid/dispatch/message.h (original)
+++ qpid/dispatch/trunk/include/qpid/dispatch/message.h Wed May 14 14:45:45 2014
@@ -194,6 +194,13 @@ void qd_message_compose_1(qd_message_t *
void qd_message_compose_2(qd_message_t *msg, qd_composed_field_t *content);
void qd_message_compose_3(qd_message_t *msg, qd_composed_field_t *content1,
qd_composed_field_t *content2);
+/** Put string representation of a message suitable for logging in buffer.
+ * @return buffer
+ */
+char* qd_message_repr(qd_message_t *msg, char *buffer, size_t len);
+/** Recommended buffer length for qd_message_repr */
+int qd_message_repr_len();
+
///@}
#endif
Modified: qpid/dispatch/trunk/src/alloc.c
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/alloc.c?rev=1594587&r1=1594586&r2=1594587&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/alloc.c (original)
+++ qpid/dispatch/trunk/src/alloc.c Wed May 14 14:45:45 2014
@@ -188,6 +188,7 @@ void *qd_alloc(qd_alloc_type_desc_t *des
void qd_dealloc(qd_alloc_type_desc_t *desc, qd_alloc_pool_t **tpool, void *p)
{
+ if (!p) return;
qd_alloc_item_t *item = ((qd_alloc_item_t*) p) - 1;
int idx;
Modified: qpid/dispatch/trunk/src/dispatch.c
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/dispatch.c?rev=1594587&r1=1594586&r2=1594587&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/dispatch.c (original)
+++ qpid/dispatch/trunk/src/dispatch.c Wed May 14 14:45:45 2014
@@ -26,6 +26,7 @@
#include "log_private.h"
#include "router_private.h"
#include "waypoint_private.h"
+#include "message_private.h"
/**
* Private Function Prototypes
@@ -63,6 +64,7 @@ qd_dispatch_t *qd_dispatch(const char *p
qd_python_initialize(qd, python_pkgdir);
qd_config_initialize();
+ qd_message_initialize();
qd->config = qd_config();
return qd;
Modified: qpid/dispatch/trunk/src/log.c
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/log.c?rev=1594587&r1=1594586&r2=1594587&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/log.c (original)
+++ qpid/dispatch/trunk/src/log.c Wed May 14 14:45:45 2014
@@ -32,6 +32,8 @@
#define LIST_MAX 1000
#define LOG_MAX 640
+int qd_log_max_len() { return TEXT_MAX; }
+
typedef struct qd_log_entry_t qd_log_entry_t;
struct qd_log_entry_t {
Modified: qpid/dispatch/trunk/src/message.c
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/message.c?rev=1594587&r1=1594586&r2=1594587&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/message.c (original)
+++ qpid/dispatch/trunk/src/message.c Wed May 14 14:45:45 2014
@@ -20,6 +20,8 @@
#include <qpid/dispatch/ctools.h>
#include <qpid/dispatch/amqp.h>
#include <qpid/dispatch/threading.h>
+#include <qpid/dispatch/iterator.h>
+#include <qpid/dispatch/log.h>
#include "message_private.h"
#include "compose_private.h"
#include <string.h>
@@ -53,6 +55,24 @@ ALLOC_DEFINE(qd_message_content_t);
typedef void (*buffer_process_t) (void *context, const unsigned char *base,
int length);
+static qd_log_source_t* log_source = 0;
+
+void qd_message_initialize() {
+ log_source = qd_log_source("MESSAGE");
+}
+
+int qd_message_repr_len() { return qd_log_max_len(); }
+
+/* TODO aconway 2014-05-13: more detailed message representation. */
+char* qd_message_repr(qd_message_t *msg, char* buffer, size_t len) {
+ qd_field_iterator_t* iter = qd_message_field_iterator((qd_message_t*)msg,
QD_FIELD_TO);
+ char* to = iter? (char*)qd_field_iterator_copy(iter) : 0;
+ qd_field_iterator_free(iter);
+ snprintf(buffer, len, "Message(%p){to=%s}", msg, to? to:"");
+ free(to);
+ return buffer;
+}
+
static void advance(unsigned char **cursor, qd_buffer_t **buffer, int consume,
buffer_process_t handler, void *context)
{
unsigned char *local_cursor = *cursor;
@@ -581,7 +601,13 @@ qd_message_t *qd_message_receive(qd_deli
qd_buffer_free(buf);
}
qd_delivery_set_context(delivery, 0);
- return (qd_message_t*) msg;
+
+ char repr[qd_message_repr_len()];
+ qd_log(log_source, QD_LOG_TRACE, "%s received, link=%s",
+ qd_message_repr((qd_message_t*)msg, repr, sizeof(repr)),
+ pn_link_name(link));
+
+ return (qd_message_t*) msg;
}
if (rc > 0) {
@@ -627,6 +653,11 @@ void qd_message_send(qd_message_t *in_ms
unsigned char *cursor;
pn_link_t *pnl = qd_link_pn(link);
+ char repr[qd_message_repr_len()];
+ qd_log(log_source, QD_LOG_TRACE, "%s sending, link=%s",
+ qd_message_repr(in_msg, repr, sizeof(repr)),
+ pn_link_name(pnl));
+
if (DEQ_SIZE(content->new_delivery_annotations) > 0) {
//
// This is the case where the delivery annotations have been modified.
@@ -812,6 +843,9 @@ int qd_message_check(qd_message_t *in_ms
result = qd_message_check_LH(content, depth);
sys_mutex_unlock(content->lock);
+ char repr[qd_message_repr_len()];
+ qd_log(log_source, QD_LOG_TRACE, "%s check",
+ qd_message_repr(in_msg, repr, sizeof(repr)));
return result;
}
Modified: qpid/dispatch/trunk/src/message_private.h
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/message_private.h?rev=1594587&r1=1594586&r2=1594587&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/message_private.h (original)
+++ qpid/dispatch/trunk/src/message_private.h Wed May 14 14:45:45 2014
@@ -100,6 +100,9 @@ ALLOC_DECLARE(qd_message_content_t);
#define MSG_CONTENT(m) (((qd_message_pvt_t*) m)->content)
+/** Initialize logging */
+void qd_message_initialize();
+
///@}
#endif
Modified: qpid/dispatch/trunk/tests/system_tests_broker.py
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/tests/system_tests_broker.py?rev=1594587&r1=1594586&r2=1594587&view=diff
==============================================================================
--- qpid/dispatch/trunk/tests/system_tests_broker.py (original)
+++ qpid/dispatch/trunk/tests/system_tests_broker.py Wed May 14 14:45:45 2014
@@ -48,7 +48,9 @@ class BrokerSystemTest(TestCase):
# Start a qdrouterd
router_conf = Qdrouterd.Config([
- ('log', { 'module':'DEFAULT', 'level':'TRACE' }),
+ ('log', { 'module':'DEFAULT', 'level':'NOTICE' }),
+ ('log', { 'module':'ROUTER', 'level':'TRACE' }),
+ ('log', { 'module':'MESSAGE', 'level':'TRACE' }),
('container', {'container-name':self.id()}),
('container', {'container-name':self.id()}),
('router', { 'mode': 'standalone', 'router-id': self.id() }),
@@ -62,23 +64,24 @@ class BrokerSystemTest(TestCase):
# Wait for broker & router to be ready
wait_ports([q.port for q in qpidd] + router.ports)
+ qpidd[0].agent.addQueue(testq)
- # Smoke test for qpidd
- qc = self.cleanup(qm.Connection.establish(qpidd[0].address))
- qc.session().sender(testq+";{create:always}").send("a")
- qr = qc.session().receiver(testq)
- self.assertEqual(qr.fetch(1).content, "a")
-
- # Smoke test for dispatch.
- addr = router.addresses[0]+"/xxx/1"
- m1, m2 = self.messenger(), self.messenger()
- m2.subscribe(addr)
- m1.put(self.message(address=addr, body="b"))
- m1.send()
+ # Test for waypoint routing via queue
+ m=self.message(address=router.addresses[0]+"/"+testq, body="c")
+ msgr = self.messenger()
+ time.sleep(3) # FIXME aconway 2014-05-07: race on router
+ msgr.subscribe(m.address)
+ time.sleep(3) # FIXME aconway 2014-05-07: race on router
+ msgr.put(m)
+ msgr.send()
msg = Message()
- m2.recv(1)
- m2.get(msg)
- self.assertEqual(msg.body, "b")
+ msgr.recv(1)
+ msgr.get(msg)
+ msgr.accept()
+ self.assertEqual(msg.body, m.body)
+ aq = qpidd[0].agent.getQueue(testq)
+ aq.update()
+ self.assertEquals((aq.msgTotalEnqueues, aq.msgTotalDequeues), (1,1))
# FIXME aconway 2014-05-05: test for waypoint routing via queue
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]