Author: tross
Date: Wed Sep 4 21:42:22 2013
New Revision: 1520148
URL: http://svn.apache.org/r1520148
Log:
QPID-5068 - Added feature to allow for modification of Delivery Annotations
- Added annotation for ingress router
- Added annotation for trace
Modified:
qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/message.h
qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/router.h
qpid/trunk/qpid/extras/dispatch/src/compose.c
qpid/trunk/qpid/extras/dispatch/src/message.c
qpid/trunk/qpid/extras/dispatch/src/message_private.h
qpid/trunk/qpid/extras/dispatch/src/python_embedded.c
qpid/trunk/qpid/extras/dispatch/src/router_node.c
Modified: qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/message.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/message.h?rev=1520148&r1=1520147&r2=1520148&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/message.h (original)
+++ qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/message.h Wed Sep 4
21:42:22 2013
@@ -24,6 +24,7 @@
#include <qpid/dispatch/iterator.h>
#include <qpid/dispatch/buffer.h>
#include <qpid/dispatch/compose.h>
+#include <qpid/dispatch/parse.h>
#include <qpid/dispatch/container.h>
// Callback for status change (confirmed persistent, loaded-in-memory, etc.)
@@ -88,22 +89,89 @@ typedef enum {
DX_FIELD_REPLY_TO_GROUP_ID
} dx_message_field_t;
-//
-// Functions for allocation
-//
+
+/**
+ * Allocate a new message.
+ *
+ * @return A pointer to a dx_message_t that is the sole reference to a newly
allocated
+ * message.
+ */
dx_message_t *dx_allocate_message(void);
-void dx_free_message(dx_message_t *qm);
-dx_message_t *dx_message_copy(dx_message_t *qm);
-int dx_message_persistent(dx_message_t *qm);
-int dx_message_in_memory(dx_message_t *qm);
-//
-// Functions for received messages
-//
+/**
+ * Free a message reference. If this is the last reference to the message,
free the
+ * message as well.
+ *
+ * @param msg A pointer to a dx_message_t that is no longer needed.
+ */
+void dx_free_message(dx_message_t *msg);
+
+/**
+ * Make a new reference to an existing message.
+ *
+ * @param msg A pointer to a dx_message_t referencing a message.
+ * @return A new pointer to the same referenced message.
+ */
+dx_message_t *dx_message_copy(dx_message_t *msg);
+
+/**
+ * Retrieve the delivery annotations from a message.
+ *
+ * IMPORTANT: The pointer returned by this function remains owned by the
message.
+ * The caller MUST NOT free the parsed field.
+ *
+ * @param msg Pointer to a received message.
+ * @return Pointer to the parsed field for the delivery annotations. If the
message doesn't
+ * have delivery annotations, the return value shall be NULL.
+ */
+dx_parsed_field_t *dx_message_delivery_annotations(dx_message_t *msg);
+
+/**
+ * Set the delivery annotations for the message. If the message already has
delivery annotations,
+ * they will be overwritten/replaced by the new field.
+ *
+ * @param msg Pointer to a receiver message.
+ * @param da Pointer to a composed field representing the new delivery
annotations of the message.
+ * If null, the message will not have a delivery annotations field.
+ * IMPORTANT: The message will not take ownership of the composed
field. The
+ * caller is responsible for freeing it after this call.
Since the contents
+ * are copied into the message, it is safe to free the
composed field
+ * any time after the call to this function.
+ */
+void dx_message_set_delivery_annotations(dx_message_t *msg,
dx_composed_field_t *da);
+
+/**
+ * Receive message data via a delivery. This function may be called more than
once on the same
+ * delivery if the message spans multiple frames. Once a complete message has
been received, this
+ * function shall return the message.
+ *
+ * @param delivery An incoming delivery from a link
+ * @return A pointer to the complete message or 0 if the message is not yet
complete.
+ */
dx_message_t *dx_message_receive(dx_delivery_t *delivery);
+
+/**
+ * Send the message outbound on an outgoing link.
+ *
+ * @param msg A pointer to a message to be sent.
+ * @param link The outgoing link on which to send the message.
+ */
void dx_message_send(dx_message_t *msg, dx_link_t *link);
+/**
+ * Check that the message is well-formed up to a certain depth. Any part of
the message that is
+ * beyond the specified depth is not checked for validity.
+ */
int dx_message_check(dx_message_t *msg, dx_message_depth_t depth);
+
+/**
+ * Return an iterator for the requested message field. If the field is not in
the message,
+ * return NULL.
+ *
+ * @param msg A pointer to a message.
+ * @param field The field to be returned via iterator.
+ * @return A field iterator that spans the requested field.
+ */
dx_field_iterator_t *dx_message_field_iterator(dx_message_t *msg,
dx_message_field_t field);
ssize_t dx_message_field_length(dx_message_t *msg, dx_message_field_t field);
Modified: qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/router.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/router.h?rev=1520148&r1=1520147&r2=1520148&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/router.h (original)
+++ qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/router.h Wed Sep 4
21:42:22 2013
@@ -29,6 +29,7 @@ typedef struct dx_address_t dx_address_t
typedef void (*dx_router_message_cb)(void *context, dx_message_t *msg);
+const char *dx_router_id(const dx_dispatch_t *dx);
dx_address_t *dx_router_register_address(dx_dispatch_t *dx,
const char *address,
Modified: qpid/trunk/qpid/extras/dispatch/src/compose.c
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/compose.c?rev=1520148&r1=1520147&r2=1520148&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/compose.c (original)
+++ qpid/trunk/qpid/extras/dispatch/src/compose.c Wed Sep 4 21:42:22 2013
@@ -233,12 +233,14 @@ void dx_compose_free(dx_composed_field_t
while (buf) {
DEQ_REMOVE_HEAD(field->buffers);
dx_free_buffer(buf);
+ buf = DEQ_HEAD(field->buffers);
}
dx_composite_t *comp = DEQ_HEAD(field->fieldStack);
while (comp) {
DEQ_REMOVE_HEAD(field->fieldStack);
free_dx_composite_t(comp);
+ comp = DEQ_HEAD(field->fieldStack);
}
free_dx_composed_field_t(field);
Modified: qpid/trunk/qpid/extras/dispatch/src/message.c
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/message.c?rev=1520148&r1=1520147&r2=1520148&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/message.c (original)
+++ qpid/trunk/qpid/extras/dispatch/src/message.c Wed Sep 4 21:42:22 2013
@@ -25,11 +25,35 @@
#include <string.h>
#include <stdio.h>
+static const unsigned char * const MSG_HDR_LONG = (unsigned
char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x70";
+static const unsigned char * const MSG_HDR_SHORT = (unsigned
char*) "\x00\x53\x70";
+static const unsigned char * const DELIVERY_ANNOTATION_LONG = (unsigned
char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x71";
+static const unsigned char * const DELIVERY_ANNOTATION_SHORT = (unsigned
char*) "\x00\x53\x71";
+static const unsigned char * const MESSAGE_ANNOTATION_LONG = (unsigned
char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x72";
+static const unsigned char * const MESSAGE_ANNOTATION_SHORT = (unsigned
char*) "\x00\x53\x72";
+static const unsigned char * const PROPERTIES_LONG = (unsigned
char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x73";
+static const unsigned char * const PROPERTIES_SHORT = (unsigned
char*) "\x00\x53\x73";
+static const unsigned char * const APPLICATION_PROPERTIES_LONG = (unsigned
char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x74";
+static const unsigned char * const APPLICATION_PROPERTIES_SHORT = (unsigned
char*) "\x00\x53\x74";
+static const unsigned char * const BODY_DATA_LONG = (unsigned
char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x75";
+static const unsigned char * const BODY_DATA_SHORT = (unsigned
char*) "\x00\x53\x75";
+static const unsigned char * const BODY_SEQUENCE_LONG = (unsigned
char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x76";
+static const unsigned char * const BODY_SEQUENCE_SHORT = (unsigned
char*) "\x00\x53\x76";
+static const unsigned char * const BODY_VALUE_LONG = (unsigned
char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x77";
+static const unsigned char * const BODY_VALUE_SHORT = (unsigned
char*) "\x00\x53\x77";
+static const unsigned char * const FOOTER_LONG = (unsigned
char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x78";
+static const unsigned char * const FOOTER_SHORT = (unsigned
char*) "\x00\x53\x78";
+static const unsigned char * const TAGS_LIST = (unsigned
char*) "\x45\xc0\xd0";
+static const unsigned char * const TAGS_MAP = (unsigned
char*) "\xc1\xd1";
+static const unsigned char * const TAGS_BINARY = (unsigned
char*) "\xa0\xb0";
+static const unsigned char * const TAGS_ANY = (unsigned
char*) "\x45\xc0\xd0\xc1\xd1\xa0\xb0";
+
ALLOC_DEFINE_CONFIG(dx_message_t, sizeof(dx_message_pvt_t), 0, 0);
ALLOC_DEFINE(dx_message_content_t);
+typedef void (*buffer_process_t) (void *context, const unsigned char *base,
int length);
-static void advance(unsigned char **cursor, dx_buffer_t **buffer, int consume)
+static void advance(unsigned char **cursor, dx_buffer_t **buffer, int consume,
buffer_process_t handler, void *context)
{
unsigned char *local_cursor = *cursor;
dx_buffer_t *local_buffer = *buffer;
@@ -37,9 +61,13 @@ static void advance(unsigned char **curs
int remaining = dx_buffer_size(local_buffer) - (local_cursor -
dx_buffer_base(local_buffer));
while (consume > 0) {
if (consume < remaining) {
+ if (handler)
+ handler(context, local_cursor, consume);
local_cursor += consume;
consume = 0;
} else {
+ if (handler)
+ handler(context, local_cursor, remaining);
consume -= remaining;
local_buffer = local_buffer->next;
if (local_buffer == 0){
@@ -59,7 +87,7 @@ static void advance(unsigned char **curs
static unsigned char next_octet(unsigned char **cursor, dx_buffer_t **buffer)
{
unsigned char result = **cursor;
- advance(cursor, buffer, 1);
+ advance(cursor, buffer, 1, 0, 0);
return result;
}
@@ -103,7 +131,7 @@ static int traverse_field(unsigned char
field->parsed = 1;
}
- advance(cursor, buffer, consume);
+ advance(cursor, buffer, consume, 0, 0);
return 1;
}
@@ -249,7 +277,7 @@ static int dx_check_and_advance(dx_buffe
location->length = pre_consume + consume;
if (consume)
- advance(&test_cursor, &test_buffer, consume);
+ advance(&test_cursor, &test_buffer, consume, 0, 0);
*cursor = test_cursor;
*buffer = test_buffer;
@@ -354,6 +382,7 @@ dx_message_t *dx_allocate_message()
msg->content->lock = sys_mutex();
msg->content->ref_count = 1;
msg->content->parse_depth = DX_DEPTH_NONE;
+ msg->content->parsed_delivery_annotations = 0;
return (dx_message_t*) msg;
}
@@ -370,14 +399,23 @@ void dx_free_message(dx_message_t *in_ms
sys_mutex_unlock(content->lock);
if (rc == 0) {
- dx_buffer_t *buf = DEQ_HEAD(content->buffers);
+ if (content->parsed_delivery_annotations)
+ dx_parse_free(content->parsed_delivery_annotations);
+ dx_buffer_t *buf = DEQ_HEAD(content->buffers);
while (buf) {
DEQ_REMOVE_HEAD(content->buffers);
dx_free_buffer(buf);
buf = DEQ_HEAD(content->buffers);
}
+ buf = DEQ_HEAD(content->new_delivery_annotations);
+ while (buf) {
+ DEQ_REMOVE_HEAD(content->new_delivery_annotations);
+ dx_free_buffer(buf);
+ buf = DEQ_HEAD(content->new_delivery_annotations);
+ }
+
sys_mutex_free(content->lock);
free_dx_message_content_t(content);
}
@@ -406,6 +444,42 @@ dx_message_t *dx_message_copy(dx_message
}
+dx_parsed_field_t *dx_message_delivery_annotations(dx_message_t *in_msg)
+{
+ dx_message_pvt_t *msg = (dx_message_pvt_t*) in_msg;
+ dx_message_content_t *content = msg->content;
+
+ if (content->parsed_delivery_annotations)
+ return content->parsed_delivery_annotations;
+
+ dx_field_iterator_t *da = dx_message_field_iterator(in_msg,
DX_FIELD_DELIVERY_ANNOTATION);
+ if (da == 0)
+ return 0;
+
+ content->parsed_delivery_annotations = dx_parse(da);
+ if (content->parsed_delivery_annotations == 0 ||
+ !dx_parse_ok(content->parsed_delivery_annotations) ||
+ !dx_parse_is_map(content->parsed_delivery_annotations)) {
+ dx_field_iterator_free(da);
+ dx_parse_free(content->parsed_delivery_annotations);
+ return 0;
+ }
+
+ return content->parsed_delivery_annotations;
+}
+
+
+void dx_message_set_delivery_annotations(dx_message_t *msg,
dx_composed_field_t *da)
+{
+ dx_message_content_t *content = MSG_CONTENT(msg);
+ dx_buffer_list_t *field_buffers = dx_compose_buffers(da);
+
+ assert(DEQ_SIZE(content->new_delivery_annotations) == 0);
+ content->new_delivery_annotations = *field_buffers;
+ DEQ_INIT(*field_buffers); // Zero out the linkage to the now moved buffers.
+}
+
+
dx_message_t *dx_message_receive(dx_delivery_t *delivery)
{
pn_delivery_t *pnd = dx_delivery_pn(delivery);
@@ -486,14 +560,74 @@ dx_message_t *dx_message_receive(dx_deli
}
+static void send_handler(void *context, const unsigned char *start, int length)
+{
+ pn_link_t *pnl = (pn_link_t*) context;
+ pn_link_send(pnl, (const char*) start, length);
+}
+
+
void dx_message_send(dx_message_t *in_msg, dx_link_t *link)
{
- dx_message_pvt_t *msg = (dx_message_pvt_t*) in_msg;
- dx_buffer_t *buf = DEQ_HEAD(msg->content->buffers);
+ dx_message_pvt_t *msg = (dx_message_pvt_t*) in_msg;
+ dx_message_content_t *content = msg->content;
+ dx_buffer_t *buf = DEQ_HEAD(content->buffers);
+ unsigned char *cursor;
+ pn_link_t *pnl = dx_link_pn(link);
+
+ if (DEQ_SIZE(content->new_delivery_annotations) > 0) {
+ //
+ // This is the case where the delivery annotations have been modified.
+ // The message send must be divided into sections: The existing
header;
+ // the new delivery annotations; the rest of the existing message.
+ // Note that the original delivery annotations that are still in the
+ // buffer chain must not be sent.
+ //
+ // Start by making sure that we've parsed the message sections through
+ // the delivery annotations
+ //
+ if (!dx_message_check(in_msg, DX_DEPTH_DELIVERY_ANNOTATIONS))
+ return;
+
+ //
+ // Send header if present
+ //
+ cursor = dx_buffer_base(buf);
+ if (content->section_message_header.length > 0) {
+ pn_link_send(pnl, (const char*) MSG_HDR_SHORT, 3);
+ buf = content->section_message_header.buffer;
+ cursor = content->section_message_header.offset +
dx_buffer_base(buf);
+ advance(&cursor, &buf, content->section_message_header.length,
send_handler, (void*) pnl);
+ }
+
+ //
+ // Send new delivery annotations
+ //
+ dx_buffer_t *da_buf = DEQ_HEAD(content->new_delivery_annotations);
+ while (da_buf) {
+ pn_link_send(pnl, (char*) dx_buffer_base(da_buf),
dx_buffer_size(da_buf));
+ da_buf = DEQ_NEXT(da_buf);
+ }
+
+ //
+ // Skip over replaced delivery annotations
+ //
+ if (content->section_delivery_annotation.length > 0)
+ advance(&cursor, &buf,
content->section_delivery_annotation.length, 0, 0);
+
+ //
+ // Send remaining partial buffer
+ //
+ if (buf) {
+ size_t len = dx_buffer_size(buf) - (cursor - dx_buffer_base(buf));
+ advance(&cursor, &buf, len, send_handler, (void*) pnl);
+ }
+
+ // Fall through to process the remaining buffers normally
+ }
- // TODO - Handle cases where annotations have been added or modified
while (buf) {
- pn_link_send(dx_link_pn(link), (char*) dx_buffer_base(buf),
dx_buffer_size(buf));
+ pn_link_send(pnl, (char*) dx_buffer_base(buf), dx_buffer_size(buf));
buf = DEQ_NEXT(buf);
}
}
@@ -523,29 +657,6 @@ static int dx_check_field_LH(dx_message_
static int dx_message_check_LH(dx_message_content_t *content,
dx_message_depth_t depth)
{
- static const unsigned char * const MSG_HDR_LONG =
(unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x70";
- static const unsigned char * const MSG_HDR_SHORT =
(unsigned char*) "\x00\x53\x70";
- static const unsigned char * const DELIVERY_ANNOTATION_LONG =
(unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x71";
- static const unsigned char * const DELIVERY_ANNOTATION_SHORT =
(unsigned char*) "\x00\x53\x71";
- static const unsigned char * const MESSAGE_ANNOTATION_LONG =
(unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x72";
- static const unsigned char * const MESSAGE_ANNOTATION_SHORT =
(unsigned char*) "\x00\x53\x72";
- static const unsigned char * const PROPERTIES_LONG =
(unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x73";
- static const unsigned char * const PROPERTIES_SHORT =
(unsigned char*) "\x00\x53\x73";
- static const unsigned char * const APPLICATION_PROPERTIES_LONG =
(unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x74";
- static const unsigned char * const APPLICATION_PROPERTIES_SHORT =
(unsigned char*) "\x00\x53\x74";
- static const unsigned char * const BODY_DATA_LONG =
(unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x75";
- static const unsigned char * const BODY_DATA_SHORT =
(unsigned char*) "\x00\x53\x75";
- static const unsigned char * const BODY_SEQUENCE_LONG =
(unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x76";
- static const unsigned char * const BODY_SEQUENCE_SHORT =
(unsigned char*) "\x00\x53\x76";
- static const unsigned char * const BODY_VALUE_LONG =
(unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x77";
- static const unsigned char * const BODY_VALUE_SHORT =
(unsigned char*) "\x00\x53\x77";
- static const unsigned char * const FOOTER_LONG =
(unsigned char*) "\x00\x80\x00\x00\x00\x00\x00\x00\x00\x78";
- static const unsigned char * const FOOTER_SHORT =
(unsigned char*) "\x00\x53\x78";
- static const unsigned char * const TAGS_LIST =
(unsigned char*) "\x45\xc0\xd0";
- static const unsigned char * const TAGS_MAP =
(unsigned char*) "\xc1\xd1";
- static const unsigned char * const TAGS_BINARY =
(unsigned char*) "\xa0\xb0";
- static const unsigned char * const TAGS_ANY =
(unsigned char*) "\x45\xc0\xd0\xc1\xd1\xa0\xb0";
-
dx_buffer_t *buffer = DEQ_HEAD(content->buffers);
if (!buffer)
Modified: qpid/trunk/qpid/extras/dispatch/src/message_private.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/message_private.h?rev=1520148&r1=1520147&r2=1520148&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/message_private.h (original)
+++ qpid/trunk/qpid/extras/dispatch/src/message_private.h Wed Sep 4 21:42:22
2013
@@ -58,11 +58,15 @@ typedef struct {
// 1) Received message is held and forwarded unmodified - single
buffer list
// 2) Received message is held and modified before forwarding - two
buffer lists
// 3) Message is composed internally - single buffer list
+// TODO - provide a way to allocate a message without a lock for the
link-routing case.
+// It's likely that link-routing will cause no contention for the
message content.
+//
typedef struct {
sys_mutex_t *lock;
- uint32_t ref_count; // The number of
qmessages referencing this
+ uint32_t ref_count; // The number of
messages referencing this
dx_buffer_list_t buffers; // The buffer chain
containing the message
+ dx_buffer_list_t new_delivery_annotations; // The buffer chain
containing the new delivery annotations
dx_field_location_t section_message_header; // The message
header list
dx_field_location_t section_delivery_annotation; // The delivery
annotation map
dx_field_location_t section_message_annotation; // The message
annotation map
@@ -77,10 +81,11 @@ typedef struct {
dx_buffer_t *parse_buffer;
unsigned char *parse_cursor;
dx_message_depth_t parse_depth;
+ dx_parsed_field_t *parsed_delivery_annotations;
} dx_message_content_t;
typedef struct {
- DEQ_LINKS(dx_message_t); // Deq linkage that
overlays the dx_message_t
+ DEQ_LINKS(dx_message_t); // Deque linkage that overlays the dx_message_t
dx_message_content_t *content;
} dx_message_pvt_t;
Modified: qpid/trunk/qpid/extras/dispatch/src/python_embedded.c
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/python_embedded.c?rev=1520148&r1=1520147&r2=1520148&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/python_embedded.c (original)
+++ qpid/trunk/qpid/extras/dispatch/src/python_embedded.c Wed Sep 4 21:42:22
2013
@@ -495,18 +495,28 @@ static void IoAdapter_dealloc(IoAdapter*
static PyObject* dx_python_send(PyObject *self, PyObject *args)
{
- IoAdapter *ioa = (IoAdapter*) self;
- const char *address;
- PyObject *app_properties;
- PyObject *body;
+ IoAdapter *ioa = (IoAdapter*) self;
+ dx_composed_field_t *field = 0;
+ const char *address;
+ PyObject *app_properties;
+ PyObject *body;
+
if (!PyArg_ParseTuple(args, "sOO", &address, &app_properties, &body))
return 0;
- dx_composed_field_t *field = dx_compose(DX_PERFORMATIVE_HEADER, 0);
+ field = dx_compose(DX_PERFORMATIVE_DELIVERY_ANNOTATIONS, field);
+ dx_compose_start_map(field);
+
+ dx_compose_insert_string(field, "qdx.ingress");
+ dx_compose_insert_string(field, dx_router_id(ioa->dx));
+
+ dx_compose_insert_string(field, "qdx.trace");
dx_compose_start_list(field);
- dx_compose_insert_bool(field, 0); // durable
+ dx_compose_insert_string(field, dx_router_id(ioa->dx));
dx_compose_end_list(field);
+ dx_compose_end_map(field);
+
field = dx_compose(DX_PERFORMATIVE_PROPERTIES, field);
dx_compose_start_list(field);
dx_compose_insert_null(field); // message-id
Modified: qpid/trunk/qpid/extras/dispatch/src/router_node.c
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/router_node.c?rev=1520148&r1=1520147&r2=1520148&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/router_node.c (original)
+++ qpid/trunk/qpid/extras/dispatch/src/router_node.c Wed Sep 4 21:42:22 2013
@@ -250,6 +250,56 @@ static int router_writable_link_handler(
}
+static void router_annotate_message(dx_router_t *router, dx_message_t *msg)
+{
+ dx_parsed_field_t *in_da = dx_message_delivery_annotations(msg);
+ dx_composed_field_t *out_da =
dx_compose(DX_PERFORMATIVE_DELIVERY_ANNOTATIONS, 0);
+
+ dx_parsed_field_t *trace = 0;
+ dx_parsed_field_t *ingress = 0;
+
+ if (in_da) {
+ trace = dx_parse_value_by_key(in_da, "qdx.trace");
+ ingress = dx_parse_value_by_key(in_da, "qdx.ingress");
+ }
+
+ dx_compose_start_map(out_da);
+
+ //
+ // If there is a trace field, append this router's ID to the trace.
+ //
+ if (trace && dx_parse_is_list(trace)) {
+ dx_compose_insert_string(out_da, "qdx.trace");
+ dx_compose_start_list(out_da);
+
+ uint32_t idx = 0;
+ dx_parsed_field_t *trace_item = dx_parse_sub_value(trace, idx);
+ while (trace_item) {
+ dx_field_iterator_t *iter = dx_parse_raw(trace_item);
+ dx_compose_insert_string_iterator(out_da, iter);
+ idx++;
+ trace_item = dx_parse_sub_value(trace, idx);
+ }
+
+ dx_compose_insert_string(out_da, router->router_id);
+ dx_compose_end_list(out_da);
+ }
+
+ //
+ // If there is no ingress field, annotate the ingress as this router
+ //
+ if (!ingress) {
+ dx_compose_insert_string(out_da, "qdx.ingress");
+ dx_compose_insert_string(out_da, router->router_id);
+ }
+
+ dx_compose_end_map(out_da);
+
+ dx_message_set_delivery_annotations(msg, out_da);
+ dx_compose_free(out_da);
+}
+
+
/**
* Inbound Delivery Handler
*/
@@ -339,6 +389,11 @@ static void router_rx_handler(void* cont
//
//
+ // Interpret and update the delivery annotations of the message
+ //
+ router_annotate_message(router, msg);
+
+ //
// Forward to the in-process handler for this message if there
is one. The
// actual invocation of the handler will occur later after
we've released
// the lock.
@@ -789,6 +844,13 @@ void dx_router_free(dx_router_t *router)
}
+const char *dx_router_id(const dx_dispatch_t *dx)
+{
+ dx_router_t *router = dx->router;
+ return router->router_id;
+}
+
+
dx_address_t *dx_router_register_address(dx_dispatch_t *dx,
const char *address,
dx_router_message_cb handler,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]