Repository: qpid-dispatch Updated Branches: refs/heads/master 2a03648c8 -> 9d08d0a67
DISPATCH-127 - Carry the ingress address phase across inter-router links so waypoints can be accessed from remote routers. Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/9d08d0a6 Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/9d08d0a6 Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/9d08d0a6 Branch: refs/heads/master Commit: 9d08d0a678d71c275ab0c05d4f28b1549630e132 Parents: 2a03648 Author: Ted Ross <[email protected]> Authored: Thu Mar 31 12:59:14 2016 -0400 Committer: Ted Ross <[email protected]> Committed: Thu Mar 31 12:59:14 2016 -0400 ---------------------------------------------------------------------- include/qpid/dispatch/amqp.h | 1 + include/qpid/dispatch/message.h | 10 ++++++++++ include/qpid/dispatch/router_core.h | 11 +++++++++++ src/amqp.c | 1 + src/message.c | 19 ++++++++++++++++++ src/message_private.h | 2 +- src/router_core/connections.c | 6 ++++++ src/router_node.c | 33 +++++++++++++++++++++++++++----- tests/system_tests_link_routes.py | 5 ----- 9 files changed, 77 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/9d08d0a6/include/qpid/dispatch/amqp.h ---------------------------------------------------------------------- diff --git a/include/qpid/dispatch/amqp.h b/include/qpid/dispatch/amqp.h index d4bc340..54773b5 100644 --- a/include/qpid/dispatch/amqp.h +++ b/include/qpid/dispatch/amqp.h @@ -94,6 +94,7 @@ enum { extern const char * const QD_MA_INGRESS; ///< Ingress Router extern const char * const QD_MA_TRACE; ///< Trace extern const char * const QD_MA_TO; ///< To-Override +extern const char * const QD_MA_PHASE; ///< Phase for override address extern const char * const QD_MA_CLASS; ///< Message-Class /// @} http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/9d08d0a6/include/qpid/dispatch/message.h ---------------------------------------------------------------------- diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h index 7ab7b9c..66601ba 100644 --- a/include/qpid/dispatch/message.h +++ b/include/qpid/dispatch/message.h @@ -176,6 +176,16 @@ void qd_message_set_trace_annotation(qd_message_t *msg, qd_composed_field_t *tra void qd_message_set_to_override_annotation(qd_message_t *msg, qd_composed_field_t *to_field); /** + * Set a phase for the phase annotation in the message. + * + * @param msg Pointer to an outgoing message. + * @param phase The phase of the address for the outgoing message. + * + */ +void qd_message_set_phase_annotation(qd_message_t *msg, int phase); +int qd_message_get_phase_annotation(const qd_message_t *msg); + +/** * Set the value for the QD_MA_INGRESS field in the outgoing message * annotations for the message. * http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/9d08d0a6/include/qpid/dispatch/router_core.h ---------------------------------------------------------------------- diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h index 6e26b41..3956b5c 100644 --- a/include/qpid/dispatch/router_core.h +++ b/include/qpid/dispatch/router_core.h @@ -382,6 +382,17 @@ qd_link_type_t qdr_link_type(const qdr_link_t *link); qd_direction_t qdr_link_direction(const qdr_link_t *link); /** + * qdr_link_phase + * + * If this link is associated with an auto_link, return the address phase. Otherwise + * return zero. + * + * @param link Link object + * @return 0 or the phase of the link's auto_link. + */ +int qdr_link_phase(const qdr_link_t *link); + +/** * qdr_link_is_anonymous * * Indicate whether the link is anonymous. Note that this is determined inside http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/9d08d0a6/src/amqp.c ---------------------------------------------------------------------- diff --git a/src/amqp.c b/src/amqp.c index c02216d..656f7ef 100644 --- a/src/amqp.c +++ b/src/amqp.c @@ -22,6 +22,7 @@ const char * const QD_MA_INGRESS = "x-opt-qd.ingress"; const char * const QD_MA_TRACE = "x-opt-qd.trace"; const char * const QD_MA_TO = "x-opt-qd.to"; +const char * const QD_MA_PHASE = "x-opt-qd.phase"; const char * const QD_MA_CLASS = "x-opt-qd.class"; const char * const QD_CAPABILITY_ROUTER_CONTROL = "qd.router"; http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/9d08d0a6/src/message.c ---------------------------------------------------------------------- diff --git a/src/message.c b/src/message.c index c8ade93..c5dc028 100644 --- a/src/message.c +++ b/src/message.c @@ -547,6 +547,7 @@ qd_message_t *qd_message() DEQ_INIT(msg->ma_to_override); DEQ_INIT(msg->ma_trace); DEQ_INIT(msg->ma_ingress); + msg->ma_phase = 0; msg->content = new_qd_message_content_t(); if (msg->content == 0) { @@ -612,6 +613,7 @@ qd_message_t *qd_message_copy(qd_message_t *in_msg) qd_buffer_list_clone(©->ma_to_override, &msg->ma_to_override); qd_buffer_list_clone(©->ma_trace, &msg->ma_trace); qd_buffer_list_clone(©->ma_ingress, &msg->ma_ingress); + copy->ma_phase = msg->ma_phase; copy->content = content; @@ -666,6 +668,18 @@ void qd_message_set_to_override_annotation(qd_message_t *in_msg, qd_composed_fie qd_compose_free(to_field); } +void qd_message_set_phase_annotation(qd_message_t *in_msg, int phase) +{ + qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg; + msg->ma_phase = phase; +} + +int qd_message_get_phase_annotation(const qd_message_t *in_msg) +{ + qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg; + return msg->ma_phase; +} + void qd_message_set_ingress_annotation(qd_message_t *in_msg, qd_composed_field_t *ingress_field) { qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg; @@ -790,6 +804,11 @@ static bool compose_message_annotations(qd_message_pvt_t *msg, qd_buffer_list_t qd_compose_insert_buffers(out_ma, &msg->ma_ingress); } + if (msg->ma_phase != 0) { + qd_compose_insert_symbol(out_ma, QD_MA_PHASE); + qd_compose_insert_int(out_ma, msg->ma_phase); + } + qd_compose_end_map(out_ma); qd_compose_take_buffers(out_ma, out); http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/9d08d0a6/src/message_private.h ---------------------------------------------------------------------- diff --git a/src/message_private.h b/src/message_private.h index f09fa25..8ede2c7 100644 --- a/src/message_private.h +++ b/src/message_private.h @@ -93,7 +93,7 @@ typedef struct { qd_buffer_list_t ma_to_override; // to field in outgoing message annotations. qd_buffer_list_t ma_trace; // trace list in outgoing message annotations qd_buffer_list_t ma_ingress; // ingress field in outgoing message annotations - + int ma_phase; // phase for the override address } qd_message_pvt_t; ALLOC_DECLARE(qd_message_t); http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/9d08d0a6/src/router_core/connections.c ---------------------------------------------------------------------- diff --git a/src/router_core/connections.c b/src/router_core/connections.c index 4cfc8cd..9b6e63b 100644 --- a/src/router_core/connections.c +++ b/src/router_core/connections.c @@ -217,6 +217,12 @@ qd_direction_t qdr_link_direction(const qdr_link_t *link) } +int qdr_link_phase(const qdr_link_t *link) +{ + return link && link->auto_link ? link->auto_link->phase : 0; +} + + bool qdr_link_is_anonymous(const qdr_link_t *link) { return link->owning_addr == 0; http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/9d08d0a6/src/router_node.c ---------------------------------------------------------------------- diff --git a/src/router_node.c b/src/router_node.c index 4bc5ad9..41a8d9a 100644 --- a/src/router_node.c +++ b/src/router_node.c @@ -94,6 +94,7 @@ static qd_field_iterator_t *router_annotate_message(qd_router_t *router, qd_parsed_field_t *trace = 0; qd_parsed_field_t *ingress = 0; qd_parsed_field_t *to = 0; + qd_parsed_field_t *phase = 0; *link_exclusions = 0; @@ -115,8 +116,10 @@ static qd_field_iterator_t *router_annotate_message(qd_router_t *router, ingress = qd_parse_sub_value(in_ma, idx); } else if (qd_field_iterator_equal(iter, (unsigned char*) QD_MA_TO)) { to = qd_parse_sub_value(in_ma, idx); + } else if (qd_field_iterator_equal(iter, (unsigned char*) QD_MA_PHASE)) { + phase = qd_parse_sub_value(in_ma, idx); } - done = trace && ingress && to; + done = trace && ingress && to && phase; } } @@ -166,6 +169,15 @@ static qd_field_iterator_t *router_annotate_message(qd_router_t *router, } // + // QD_MA_PHASE: + // Preserve the existing value. + // + if (phase) { + int phase_val = qd_parse_as_int(phase); + qd_message_set_phase_annotation(msg, phase_val); + } + + // // QD_MA_INGRESS: // If there is no ingress field, annotate the ingress as // this router else keep the original field. @@ -264,14 +276,17 @@ static void AMQP_rx_handler(void* context, qd_link_t *link, pn_delivery_t *pnd) if (anonymous_link) { qd_field_iterator_t *addr_iter = 0; + int phase = 0; // // If the message has delivery annotations, get the to-override field from the annotations. // if (in_ma) { qd_parsed_field_t *ma_to = qd_parse_value_by_key(in_ma, QD_MA_TO); - if (ma_to) + if (ma_to) { addr_iter = qd_field_iterator_dup(qd_parse_raw(ma_to)); + phase = qd_message_get_phase_annotation(msg); + } } // @@ -282,15 +297,23 @@ static void AMQP_rx_handler(void* context, qd_link_t *link, pn_delivery_t *pnd) if (addr_iter) { qd_address_iterator_reset_view(addr_iter, ITER_VIEW_ADDRESS_HASH); + if (phase > 0) + qd_address_iterator_set_phase(addr_iter, '0' + (char) phase); delivery = qdr_link_deliver_to(rlink, msg, ingress_iter, addr_iter, pn_delivery_settled(pnd), link_exclusions); } } else { - const char *r_tgt = pn_terminus_get_address(qd_link_remote_target(link)); - if (r_tgt) { + const char *term_addr = pn_terminus_get_address(qd_link_remote_target(link)); + if (!term_addr) + term_addr = pn_terminus_get_address(qd_link_source(link)); + + if (term_addr) { qd_composed_field_t *to_override = qd_compose_subfield(0); - qd_compose_insert_string(to_override, r_tgt); + qd_compose_insert_string(to_override, term_addr); qd_message_set_to_override_annotation(msg, to_override); + int phase = qdr_link_phase(rlink); + if (phase != 0) + qd_message_set_phase_annotation(msg, phase); } delivery = qdr_link_deliver(rlink, msg, ingress_iter, pn_delivery_settled(pnd), link_exclusions); } http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/9d08d0a6/tests/system_tests_link_routes.py ---------------------------------------------------------------------- diff --git a/tests/system_tests_link_routes.py b/tests/system_tests_link_routes.py index 89a3fa7..6f3ca61 100644 --- a/tests/system_tests_link_routes.py +++ b/tests/system_tests_link_routes.py @@ -126,11 +126,8 @@ class LinkRoutePatternTest(TestCase): """ out = self.run_qdstat_linkRoute(self.routers[1].addresses[0]) out_list = out.split() - self.assertEqual(out_list.count('org.apache.qpid.dispatch.config.linkRoute'), 2) - self.assertEqual(out_list.count('org.apache.qpid.dispatch.config.linkRoute'), 2) self.assertEqual(out_list.count('in'), 1) self.assertEqual(out_list.count('out'), 1) - self.assertEqual(out_list.count('broker'), 2) def test_ccc_qdstat_link_routes_routerC(self): """ @@ -140,8 +137,6 @@ class LinkRoutePatternTest(TestCase): out = self.run_qdstat_linkRoute(self.routers[2].addresses[1]) out_list = out.split() - self.assertEqual(out_list.count('org.apache.qpid.dispatch.config.linkRoute'), 2) - self.assertEqual(out_list.count('org.apache.qpid.dispatch.config.linkRoute'), 2) self.assertEqual(out_list.count('in'), 1) self.assertEqual(out_list.count('out'), 1) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
