Author: tross
Date: Wed Sep  4 21:42:22 2013
New Revision: 1520148

URL: http://svn.apache.org/r1520148
Log:
QPID-5068 - Added feature to allow for modification of Delivery Annotations
  - Added annotation for ingress router
  - Added annotation for trace

Modified:
    qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/message.h
    qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/router.h
    qpid/trunk/qpid/extras/dispatch/src/compose.c
    qpid/trunk/qpid/extras/dispatch/src/message.c
    qpid/trunk/qpid/extras/dispatch/src/message_private.h
    qpid/trunk/qpid/extras/dispatch/src/python_embedded.c
    qpid/trunk/qpid/extras/dispatch/src/router_node.c

Modified: qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/message.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/message.h?rev=1520148&r1=1520147&r2=1520148&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/message.h (original)
+++ qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/message.h Wed Sep  4 
21:42:22 2013
@@ -24,6 +24,7 @@
 #include <qpid/dispatch/iterator.h>
 #include <qpid/dispatch/buffer.h>
 #include <qpid/dispatch/compose.h>
+#include <qpid/dispatch/parse.h>
 #include <qpid/dispatch/container.h>
 
 // Callback for status change (confirmed persistent, loaded-in-memory, etc.)
@@ -88,22 +89,89 @@ typedef enum {
     DX_FIELD_REPLY_TO_GROUP_ID
 } dx_message_field_t;
 
-//
-// Functions for allocation
-//
+
+/**
+ * Allocate a new message.
+ *
+ * @return A pointer to a dx_message_t that is the sole reference to a newly 
allocated
+ *         message.
+ */
 dx_message_t *dx_allocate_message(void);
-void dx_free_message(dx_message_t *qm);
-dx_message_t *dx_message_copy(dx_message_t *qm);
-int dx_message_persistent(dx_message_t *qm);
-int dx_message_in_memory(dx_message_t *qm);
 
-//
-// Functions for received messages
-//
+/**
+ * Free a message reference.  If this is the last reference to the message, 
free the
+ * message as well.
+ *
+ * @param msg A pointer to a dx_message_t that is no longer needed.
+ */
+void dx_free_message(dx_message_t *msg);
+
+/**
+ * Make a new reference to an existing message.
+ *
+ * @param msg A pointer to a dx_message_t referencing a message.
+ * @return A new pointer to the same referenced message.
+ */
+dx_message_t *dx_message_copy(dx_message_t *msg);
+
+/**
+ * Retrieve the delivery annotations from a message.
+ *
+ * IMPORTANT: The pointer returned by this function remains owned by the 
message.
+ *            The caller MUST NOT free the parsed field.
+ *
+ * @param msg Pointer to a received message.
+ * @return Pointer to the parsed field for the delivery annotations.  If the 
message doesn't
+ *         have delivery annotations, the return value shall be NULL.
+ */
+dx_parsed_field_t *dx_message_delivery_annotations(dx_message_t *msg);
+
+/**
+ * Set the delivery annotations for the message.  If the message already has 
delivery annotations,
+ * they will be overwritten/replaced by the new field.
+ *
+ * @param msg Pointer to a receiver message.
+ * @param da Pointer to a composed field representing the new delivery 
annotations of the message.
+ *           If null, the message will not have a delivery annotations field.
+ *           IMPORTANT: The message will not take ownership of the composed 
field.  The
+ *                      caller is responsible for freeing it after this call.  
Since the contents
+ *                      are copied into the message, it is safe to free the 
composed field
+ *                      any time after the call to this function.
+ */
+void dx_message_set_delivery_annotations(dx_message_t *msg, 
dx_composed_field_t *da);
+
+/**
+ * Receive message data via a delivery.  This function may be called more than 
once on the same
+ * delivery if the message spans multiple frames.  Once a complete message has 
been received, this
+ * function shall return the message.
+ *
+ * @param delivery An incoming delivery from a link
+ * @return A pointer to the complete message or 0 if the message is not yet 
complete.
+ */
 dx_message_t *dx_message_receive(dx_delivery_t *delivery);
+
+/**
+ * Send the message outbound on an outgoing link.
+ *
+ * @param msg A pointer to a message to be sent.
+ * @param link The outgoing link on which to send the message.
+ */
 void dx_message_send(dx_message_t *msg, dx_link_t *link);
 
+/**
+ * Check that the message is well-formed up to a certain depth.  Any part of 
the message that is
+ * beyond the specified depth is not checked for validity.
+ */
 int dx_message_check(dx_message_t *msg, dx_message_depth_t depth);
+
+/**
+ * Return an iterator for the requested message field.  If the field is not in 
the message,
+ * return NULL.
+ *
+ * @param msg A pointer to a message.
+ * @param field The field to be returned via iterator.
+ * @return A field iterator that spans the requested field.
+ */
 dx_field_iterator_t *dx_message_field_iterator(dx_message_t *msg, 
dx_message_field_t field);
 
 ssize_t dx_message_field_length(dx_message_t *msg, dx_message_field_t field);

Modified: qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/router.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/router.h?rev=1520148&r1=1520147&r2=1520148&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/router.h (original)
+++ qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/router.h Wed Sep  4 
21:42:22 2013
@@ -29,6 +29,7 @@ typedef struct dx_address_t dx_address_t
 
 typedef void (*dx_router_message_cb)(void *context, dx_message_t *msg);
 
+const char *dx_router_id(const dx_dispatch_t *dx);
 
 dx_address_t *dx_router_register_address(dx_dispatch_t        *dx,
                                          const char           *address,

Modified: qpid/trunk/qpid/extras/dispatch/src/compose.c
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/compose.c?rev=1520148&r1=1520147&r2=1520148&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/compose.c (original)
+++ qpid/trunk/qpid/extras/dispatch/src/compose.c Wed Sep  4 21:42:22 2013
@@ -233,12 +233,14 @@ void dx_compose_free(dx_composed_field_t
     while (buf) {
         DEQ_REMOVE_HEAD(field->buffers);
         dx_free_buffer(buf);
+        buf = DEQ_HEAD(field->buffers);
     }
 
     dx_composite_t *comp = DEQ_HEAD(field->fieldStack);
     while (comp) {
         DEQ_REMOVE_HEAD(field->fieldStack);
         free_dx_composite_t(comp);
+        comp = DEQ_HEAD(field->fieldStack);
     }
 
     free_dx_composed_field_t(field);

Modified: qpid/trunk/qpid/extras/dispatch/src/message.c
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/message.c?rev=1520148&r1=1520147&r2=1520148&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/message.c (original)
+++ qpid/trunk/qpid/extras/dispatch/src/message.c Wed Sep  4 21:42:22 2013
@@ -25,11 +25,35 @@
 #include <string.h>
 #include <stdio.h>
 
+static const unsigned char * const MSG_HDR_LONG                 = (unsigned 
char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x70";
+static const unsigned char * const MSG_HDR_SHORT                = (unsigned 
char*) "\x00\x53\x70";
+static const unsigned char * const DELIVERY_ANNOTATION_LONG     = (unsigned 
char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x71";
+static const unsigned char * const DELIVERY_ANNOTATION_SHORT    = (unsigned 
char*) "\x00\x53\x71";
+static const unsigned char * const MESSAGE_ANNOTATION_LONG      = (unsigned 
char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x72";
+static const unsigned char * const MESSAGE_ANNOTATION_SHORT     = (unsigned 
char*) "\x00\x53\x72";
+static const unsigned char * const PROPERTIES_LONG              = (unsigned 
char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x73";
+static const unsigned char * const PROPERTIES_SHORT             = (unsigned 
char*) "\x00\x53\x73";
+static const unsigned char * const APPLICATION_PROPERTIES_LONG  = (unsigned 
char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x74";
+static const unsigned char * const APPLICATION_PROPERTIES_SHORT = (unsigned 
char*) "\x00\x53\x74";
+static const unsigned char * const BODY_DATA_LONG               = (unsigned 
char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x75";
+static const unsigned char * const BODY_DATA_SHORT              = (unsigned 
char*) "\x00\x53\x75";
+static const unsigned char * const BODY_SEQUENCE_LONG           = (unsigned 
char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x76";
+static const unsigned char * const BODY_SEQUENCE_SHORT          = (unsigned 
char*) "\x00\x53\x76";
+static const unsigned char * const BODY_VALUE_LONG              = (unsigned 
char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x77";
+static const unsigned char * const BODY_VALUE_SHORT             = (unsigned 
char*) "\x00\x53\x77";
+static const unsigned char * const FOOTER_LONG                  = (unsigned 
char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x78";
+static const unsigned char * const FOOTER_SHORT                 = (unsigned 
char*) "\x00\x53\x78";
+static const unsigned char * const TAGS_LIST                    = (unsigned 
char*) "\x45\xc0\xd0";
+static const unsigned char * const TAGS_MAP                     = (unsigned 
char*) "\xc1\xd1";
+static const unsigned char * const TAGS_BINARY                  = (unsigned 
char*) "\xa0\xb0";
+static const unsigned char * const TAGS_ANY                     = (unsigned 
char*) "\x45\xc0\xd0\xc1\xd1\xa0\xb0";
+
 ALLOC_DEFINE_CONFIG(dx_message_t, sizeof(dx_message_pvt_t), 0, 0);
 ALLOC_DEFINE(dx_message_content_t);
 
+typedef void (*buffer_process_t) (void *context, const unsigned char *base, 
int length);
 
-static void advance(unsigned char **cursor, dx_buffer_t **buffer, int consume)
+static void advance(unsigned char **cursor, dx_buffer_t **buffer, int consume, 
buffer_process_t handler, void *context)
 {
     unsigned char *local_cursor = *cursor;
     dx_buffer_t   *local_buffer = *buffer;
@@ -37,9 +61,13 @@ static void advance(unsigned char **curs
     int remaining = dx_buffer_size(local_buffer) - (local_cursor - 
dx_buffer_base(local_buffer));
     while (consume > 0) {
         if (consume < remaining) {
+            if (handler)
+                handler(context, local_cursor, consume);
             local_cursor += consume;
             consume = 0;
         } else {
+            if (handler)
+                handler(context, local_cursor, remaining);
             consume -= remaining;
             local_buffer = local_buffer->next;
             if (local_buffer == 0){
@@ -59,7 +87,7 @@ static void advance(unsigned char **curs
 static unsigned char next_octet(unsigned char **cursor, dx_buffer_t **buffer)
 {
     unsigned char result = **cursor;
-    advance(cursor, buffer, 1);
+    advance(cursor, buffer, 1, 0, 0);
     return result;
 }
 
@@ -103,7 +131,7 @@ static int traverse_field(unsigned char 
         field->parsed = 1;
     }
 
-    advance(cursor, buffer, consume);
+    advance(cursor, buffer, consume, 0, 0);
     return 1;
 }
 
@@ -249,7 +277,7 @@ static int dx_check_and_advance(dx_buffe
 
     location->length = pre_consume + consume;
     if (consume)
-        advance(&test_cursor, &test_buffer, consume);
+        advance(&test_cursor, &test_buffer, consume, 0, 0);
 
     *cursor = test_cursor;
     *buffer = test_buffer;
@@ -354,6 +382,7 @@ dx_message_t *dx_allocate_message()
     msg->content->lock        = sys_mutex();
     msg->content->ref_count   = 1;
     msg->content->parse_depth = DX_DEPTH_NONE;
+    msg->content->parsed_delivery_annotations = 0;
 
     return (dx_message_t*) msg;
 }
@@ -370,14 +399,23 @@ void dx_free_message(dx_message_t *in_ms
     sys_mutex_unlock(content->lock);
 
     if (rc == 0) {
-        dx_buffer_t *buf = DEQ_HEAD(content->buffers);
+        if (content->parsed_delivery_annotations)
+            dx_parse_free(content->parsed_delivery_annotations);
 
+        dx_buffer_t *buf = DEQ_HEAD(content->buffers);
         while (buf) {
             DEQ_REMOVE_HEAD(content->buffers);
             dx_free_buffer(buf);
             buf = DEQ_HEAD(content->buffers);
         }
 
+        buf = DEQ_HEAD(content->new_delivery_annotations);
+        while (buf) {
+            DEQ_REMOVE_HEAD(content->new_delivery_annotations);
+            dx_free_buffer(buf);
+            buf = DEQ_HEAD(content->new_delivery_annotations);
+        }
+
         sys_mutex_free(content->lock);
         free_dx_message_content_t(content);
     }
@@ -406,6 +444,42 @@ dx_message_t *dx_message_copy(dx_message
 }
 
 
+dx_parsed_field_t *dx_message_delivery_annotations(dx_message_t *in_msg)
+{
+    dx_message_pvt_t     *msg     = (dx_message_pvt_t*) in_msg;
+    dx_message_content_t *content = msg->content;
+
+    if (content->parsed_delivery_annotations)
+        return content->parsed_delivery_annotations;
+
+    dx_field_iterator_t *da = dx_message_field_iterator(in_msg, 
DX_FIELD_DELIVERY_ANNOTATION);
+    if (da == 0)
+        return 0;
+
+    content->parsed_delivery_annotations = dx_parse(da);
+    if (content->parsed_delivery_annotations == 0 ||
+        !dx_parse_ok(content->parsed_delivery_annotations) ||
+        !dx_parse_is_map(content->parsed_delivery_annotations)) {
+        dx_field_iterator_free(da);
+        dx_parse_free(content->parsed_delivery_annotations);
+        return 0;
+    }
+
+    return content->parsed_delivery_annotations;
+}
+
+
+void dx_message_set_delivery_annotations(dx_message_t *msg, 
dx_composed_field_t *da)
+{
+    dx_message_content_t *content       = MSG_CONTENT(msg);
+    dx_buffer_list_t     *field_buffers = dx_compose_buffers(da);
+
+    assert(DEQ_SIZE(content->new_delivery_annotations) == 0);
+    content->new_delivery_annotations = *field_buffers;
+    DEQ_INIT(*field_buffers); // Zero out the linkage to the now moved buffers.
+}
+
+
 dx_message_t *dx_message_receive(dx_delivery_t *delivery)
 {
     pn_delivery_t    *pnd  = dx_delivery_pn(delivery);
@@ -486,14 +560,74 @@ dx_message_t *dx_message_receive(dx_deli
 }
 
 
+static void send_handler(void *context, const unsigned char *start, int length)
+{
+    pn_link_t *pnl = (pn_link_t*) context;
+    pn_link_send(pnl, (const char*) start, length);
+}
+
+
 void dx_message_send(dx_message_t *in_msg, dx_link_t *link)
 {
-    dx_message_pvt_t *msg = (dx_message_pvt_t*) in_msg;
-    dx_buffer_t      *buf = DEQ_HEAD(msg->content->buffers);
+    dx_message_pvt_t     *msg     = (dx_message_pvt_t*) in_msg;
+    dx_message_content_t *content = msg->content;
+    dx_buffer_t          *buf     = DEQ_HEAD(content->buffers);
+    unsigned char        *cursor;
+    pn_link_t            *pnl     = dx_link_pn(link);
+
+    if (DEQ_SIZE(content->new_delivery_annotations) > 0) {
+        //
+        // This is the case where the delivery annotations have been modified.
+        // The message send must be divided into sections:  The existing 
header;
+        // the new delivery annotations; the rest of the existing message.
+        // Note that the original delivery annotations that are still in the
+        // buffer chain must not be sent.
+        //
+        // Start by making sure that we've parsed the message sections through
+        // the delivery annotations
+        //
+        if (!dx_message_check(in_msg, DX_DEPTH_DELIVERY_ANNOTATIONS))
+            return;
+
+        //
+        // Send header if present
+        //
+        cursor = dx_buffer_base(buf);
+        if (content->section_message_header.length > 0) {
+            pn_link_send(pnl, (const char*) MSG_HDR_SHORT, 3);
+            buf    = content->section_message_header.buffer;
+            cursor = content->section_message_header.offset + 
dx_buffer_base(buf);
+            advance(&cursor, &buf, content->section_message_header.length, 
send_handler, (void*) pnl);
+        }
+
+        //
+        // Send new delivery annotations
+        //
+        dx_buffer_t *da_buf = DEQ_HEAD(content->new_delivery_annotations);
+        while (da_buf) {
+            pn_link_send(pnl, (char*) dx_buffer_base(da_buf), 
dx_buffer_size(da_buf));
+            da_buf = DEQ_NEXT(da_buf);
+        }
+
+        //
+        // Skip over replaced delivery annotations
+        //
+        if (content->section_delivery_annotation.length > 0)
+            advance(&cursor, &buf, 
content->section_delivery_annotation.length, 0, 0);
+
+        //
+        // Send remaining partial buffer
+        //
+        if (buf) {
+            size_t len = dx_buffer_size(buf) - (cursor - dx_buffer_base(buf));
+            advance(&cursor, &buf, len, send_handler, (void*) pnl);
+        }
+
+        // Fall through to process the remaining buffers normally
+    }
 
-    // TODO - Handle cases where annotations have been added or modified
     while (buf) {
-        pn_link_send(dx_link_pn(link), (char*) dx_buffer_base(buf), 
dx_buffer_size(buf));
+        pn_link_send(pnl, (char*) dx_buffer_base(buf), dx_buffer_size(buf));
         buf = DEQ_NEXT(buf);
     }
 }
@@ -523,29 +657,6 @@ static int dx_check_field_LH(dx_message_
 
 static int dx_message_check_LH(dx_message_content_t *content, 
dx_message_depth_t depth)
 {
-    static const unsigned char * const MSG_HDR_LONG                 = 
(unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x70";
-    static const unsigned char * const MSG_HDR_SHORT                = 
(unsigned char*) "\x00\x53\x70";
-    static const unsigned char * const DELIVERY_ANNOTATION_LONG     = 
(unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x71";
-    static const unsigned char * const DELIVERY_ANNOTATION_SHORT    = 
(unsigned char*) "\x00\x53\x71";
-    static const unsigned char * const MESSAGE_ANNOTATION_LONG      = 
(unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x72";
-    static const unsigned char * const MESSAGE_ANNOTATION_SHORT     = 
(unsigned char*) "\x00\x53\x72";
-    static const unsigned char * const PROPERTIES_LONG              = 
(unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x73";
-    static const unsigned char * const PROPERTIES_SHORT             = 
(unsigned char*) "\x00\x53\x73";
-    static const unsigned char * const APPLICATION_PROPERTIES_LONG  = 
(unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x74";
-    static const unsigned char * const APPLICATION_PROPERTIES_SHORT = 
(unsigned char*) "\x00\x53\x74";
-    static const unsigned char * const BODY_DATA_LONG               = 
(unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x75";
-    static const unsigned char * const BODY_DATA_SHORT              = 
(unsigned char*) "\x00\x53\x75";
-    static const unsigned char * const BODY_SEQUENCE_LONG           = 
(unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x76";
-    static const unsigned char * const BODY_SEQUENCE_SHORT          = 
(unsigned char*) "\x00\x53\x76";
-    static const unsigned char * const BODY_VALUE_LONG              = 
(unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x77";
-    static const unsigned char * const BODY_VALUE_SHORT             = 
(unsigned char*) "\x00\x53\x77";
-    static const unsigned char * const FOOTER_LONG                  = 
(unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x78";
-    static const unsigned char * const FOOTER_SHORT                 = 
(unsigned char*) "\x00\x53\x78";
-    static const unsigned char * const TAGS_LIST                    = 
(unsigned char*) "\x45\xc0\xd0";
-    static const unsigned char * const TAGS_MAP                     = 
(unsigned char*) "\xc1\xd1";
-    static const unsigned char * const TAGS_BINARY                  = 
(unsigned char*) "\xa0\xb0";
-    static const unsigned char * const TAGS_ANY                     = 
(unsigned char*) "\x45\xc0\xd0\xc1\xd1\xa0\xb0";
-
     dx_buffer_t *buffer  = DEQ_HEAD(content->buffers);
 
     if (!buffer)

Modified: qpid/trunk/qpid/extras/dispatch/src/message_private.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/message_private.h?rev=1520148&r1=1520147&r2=1520148&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/message_private.h (original)
+++ qpid/trunk/qpid/extras/dispatch/src/message_private.h Wed Sep  4 21:42:22 
2013
@@ -58,11 +58,15 @@ typedef struct {
 //            1) Received message is held and forwarded unmodified - single 
buffer list
 //            2) Received message is held and modified before forwarding - two 
buffer lists
 //            3) Message is composed internally - single buffer list
+// TODO - provide a way to allocate a message without a lock for the 
link-routing case.
+//        It's likely that link-routing will cause no contention for the 
message content.
+//
 
 typedef struct {
     sys_mutex_t         *lock;
-    uint32_t             ref_count;                       // The number of 
qmessages referencing this
+    uint32_t             ref_count;                       // The number of 
messages referencing this
     dx_buffer_list_t     buffers;                         // The buffer chain 
containing the message
+    dx_buffer_list_t     new_delivery_annotations;        // The buffer chain 
containing the new delivery annotations
     dx_field_location_t  section_message_header;          // The message 
header list
     dx_field_location_t  section_delivery_annotation;     // The delivery 
annotation map
     dx_field_location_t  section_message_annotation;      // The message 
annotation map
@@ -77,10 +81,11 @@ typedef struct {
     dx_buffer_t         *parse_buffer;
     unsigned char       *parse_cursor;
     dx_message_depth_t   parse_depth;
+    dx_parsed_field_t   *parsed_delivery_annotations;
 } dx_message_content_t;
 
 typedef struct {
-    DEQ_LINKS(dx_message_t);                              // Deq linkage that 
overlays the dx_message_t
+    DEQ_LINKS(dx_message_t);   // Deque linkage that overlays the dx_message_t
     dx_message_content_t *content;
 } dx_message_pvt_t;
 

Modified: qpid/trunk/qpid/extras/dispatch/src/python_embedded.c
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/python_embedded.c?rev=1520148&r1=1520147&r2=1520148&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/python_embedded.c (original)
+++ qpid/trunk/qpid/extras/dispatch/src/python_embedded.c Wed Sep  4 21:42:22 
2013
@@ -495,18 +495,28 @@ static void IoAdapter_dealloc(IoAdapter*
 
 static PyObject* dx_python_send(PyObject *self, PyObject *args)
 {
-    IoAdapter  *ioa = (IoAdapter*) self;
-    const char *address;
-    PyObject   *app_properties;
-    PyObject   *body;
+    IoAdapter           *ioa   = (IoAdapter*) self;
+    dx_composed_field_t *field = 0;
+    const char          *address;
+    PyObject            *app_properties;
+    PyObject            *body;
+
     if (!PyArg_ParseTuple(args, "sOO", &address, &app_properties, &body))
         return 0;
 
-    dx_composed_field_t *field = dx_compose(DX_PERFORMATIVE_HEADER, 0);
+    field = dx_compose(DX_PERFORMATIVE_DELIVERY_ANNOTATIONS, field);
+    dx_compose_start_map(field);
+
+    dx_compose_insert_string(field, "qdx.ingress");
+    dx_compose_insert_string(field, dx_router_id(ioa->dx));
+
+    dx_compose_insert_string(field, "qdx.trace");
     dx_compose_start_list(field);
-    dx_compose_insert_bool(field, 0);     // durable
+    dx_compose_insert_string(field, dx_router_id(ioa->dx));
     dx_compose_end_list(field);
 
+    dx_compose_end_map(field);
+
     field = dx_compose(DX_PERFORMATIVE_PROPERTIES, field);
     dx_compose_start_list(field);
     dx_compose_insert_null(field);            // message-id

Modified: qpid/trunk/qpid/extras/dispatch/src/router_node.c
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/router_node.c?rev=1520148&r1=1520147&r2=1520148&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/router_node.c (original)
+++ qpid/trunk/qpid/extras/dispatch/src/router_node.c Wed Sep  4 21:42:22 2013
@@ -250,6 +250,56 @@ static int router_writable_link_handler(
 }
 
 
+static void router_annotate_message(dx_router_t *router, dx_message_t *msg)
+{
+    dx_parsed_field_t   *in_da  = dx_message_delivery_annotations(msg);
+    dx_composed_field_t *out_da = 
dx_compose(DX_PERFORMATIVE_DELIVERY_ANNOTATIONS, 0);
+
+    dx_parsed_field_t *trace   = 0;
+    dx_parsed_field_t *ingress = 0;
+
+    if (in_da) {
+        trace   = dx_parse_value_by_key(in_da, "qdx.trace");
+        ingress = dx_parse_value_by_key(in_da, "qdx.ingress");
+    }
+
+    dx_compose_start_map(out_da);
+
+    //
+    // If there is a trace field, append this router's ID to the trace.
+    //
+    if (trace && dx_parse_is_list(trace)) {
+        dx_compose_insert_string(out_da, "qdx.trace");
+        dx_compose_start_list(out_da);
+
+        uint32_t idx = 0;
+        dx_parsed_field_t *trace_item = dx_parse_sub_value(trace, idx);
+        while (trace_item) {
+            dx_field_iterator_t *iter = dx_parse_raw(trace_item);
+            dx_compose_insert_string_iterator(out_da, iter);
+            idx++;
+            trace_item = dx_parse_sub_value(trace, idx);
+        }
+
+        dx_compose_insert_string(out_da, router->router_id);
+        dx_compose_end_list(out_da);
+    }
+
+    //
+    // If there is no ingress field, annotate the ingress as this router
+    //
+    if (!ingress) {
+        dx_compose_insert_string(out_da, "qdx.ingress");
+        dx_compose_insert_string(out_da, router->router_id);
+    }
+
+    dx_compose_end_map(out_da);
+
+    dx_message_set_delivery_annotations(msg, out_da);
+    dx_compose_free(out_da);
+}
+
+
 /**
  * Inbound Delivery Handler
  */
@@ -339,6 +389,11 @@ static void router_rx_handler(void* cont
                 //
 
                 //
+                // Interpret and update the delivery annotations of the message
+                //
+                router_annotate_message(router, msg);
+
+                //
                 // Forward to the in-process handler for this message if there 
is one.  The
                 // actual invocation of the handler will occur later after 
we've released
                 // the lock.
@@ -789,6 +844,13 @@ void dx_router_free(dx_router_t *router)
 }
 
 
+const char *dx_router_id(const dx_dispatch_t *dx)
+{
+    dx_router_t *router = dx->router;
+    return router->router_id;
+}
+
+
 dx_address_t *dx_router_register_address(dx_dispatch_t        *dx,
                                          const char           *address,
                                          dx_router_message_cb  handler,



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to