Author: tross
Date: Thu Feb 5 21:46:28 2015
New Revision: 1657699
URL: http://svn.apache.org/r1657699
Log:
DISPATCH-6 - Added dynamic-source link-routing.
This feature uses the x-opt-qd.address option in dynamic-node-properties
to determine how to route a listener with a dynamic source.
Modified:
qpid/dispatch/trunk/include/qpid/dispatch/amqp.h
qpid/dispatch/trunk/include/qpid/dispatch/container.h
qpid/dispatch/trunk/include/qpid/dispatch/iterator.h
qpid/dispatch/trunk/src/amqp.c
qpid/dispatch/trunk/src/container.c
qpid/dispatch/trunk/src/router_node.c
qpid/dispatch/trunk/tests/config-2-broker/A.conf
qpid/dispatch/trunk/tests/config-2-broker/B.conf
Modified: qpid/dispatch/trunk/include/qpid/dispatch/amqp.h
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/include/qpid/dispatch/amqp.h?rev=1657699&r1=1657698&r2=1657699&view=diff
==============================================================================
--- qpid/dispatch/trunk/include/qpid/dispatch/amqp.h (original)
+++ qpid/dispatch/trunk/include/qpid/dispatch/amqp.h Thu Feb 5 21:46:28 2015
@@ -107,6 +107,11 @@ const char * const QD_CAPABILITY_ANONYMO
const char * const QD_CAPABILITY_ROUTER;
/// @}
+/** @name Dynamic Node Properties */
+/// @{
+const char * const QD_DYNAMIC_NODE_PROPERTY_ADDRESS; ///< Address for routing
dynamic sources
+/// @}
+
/** @name Miscellaneous Strings */
/// @{
const char * const QD_INTERNODE_LINK_NAME_1;
Modified: qpid/dispatch/trunk/include/qpid/dispatch/container.h
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/include/qpid/dispatch/container.h?rev=1657699&r1=1657698&r2=1657699&view=diff
==============================================================================
--- qpid/dispatch/trunk/include/qpid/dispatch/container.h (original)
+++ qpid/dispatch/trunk/include/qpid/dispatch/container.h Thu Feb 5 21:46:28
2015
@@ -106,6 +106,9 @@ typedef struct {
qd_container_link_detach_handler_t link_detach_handler;
///@}
+ /** Invoked when a link we created was opened by the peer */
+ qd_container_link_handler_t link_attach_handler;
+
/** @name Node-Type Handlers
* @{
*/
Modified: qpid/dispatch/trunk/include/qpid/dispatch/iterator.h
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/include/qpid/dispatch/iterator.h?rev=1657699&r1=1657698&r2=1657699&view=diff
==============================================================================
--- qpid/dispatch/trunk/include/qpid/dispatch/iterator.h (original)
+++ qpid/dispatch/trunk/include/qpid/dispatch/iterator.h Thu Feb 5 21:46:28
2015
@@ -83,7 +83,7 @@ typedef struct qd_field_iterator_t qd_fi
* amqp:/_topo/all/all/<local>
* L^^^^^^^
* amqp:/<mobile>
- * M^^^^^^^^
+ * M0^^^^^^^^
*
* ITER_VIEW_NODE_HASH - Isolate the hashable part of a router-id, used for
headers
*
Modified: qpid/dispatch/trunk/src/amqp.c
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/amqp.c?rev=1657699&r1=1657698&r2=1657699&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/amqp.c (original)
+++ qpid/dispatch/trunk/src/amqp.c Thu Feb 5 21:46:28 2015
@@ -27,6 +27,8 @@ const char * const QD_MA_CLASS = "x-op
const char * const QD_CAPABILITY_ROUTER = "qd.router";
const char * const QD_CAPABILITY_ANONYMOUS_RELAY = "ANONYMOUS-RELAY";
+const char * const QD_DYNAMIC_NODE_PROPERTY_ADDRESS = "x-opt-qd.address";
+
const char * const QD_INTERNODE_LINK_NAME_1 = "qd.internode.1";
const char * const QD_INTERNODE_LINK_NAME_2 = "qd.internode.2";
Modified: qpid/dispatch/trunk/src/container.c
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/container.c?rev=1657699&r1=1657698&r2=1657699&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/container.c (original)
+++ qpid/dispatch/trunk/src/container.c Thu Feb 5 21:46:28 2015
@@ -184,6 +184,16 @@ static void setup_incoming_link(qd_conta
}
+static void handle_link_open(qd_container_t *container, pn_link_t *pn_link)
+{
+ qd_link_t *link = (qd_link_t*) pn_link_get_context(pn_link);
+ if (link == 0)
+ return;
+ if (link->node->ntype->link_attach_handler)
+ link->node->ntype->link_attach_handler(link->node->context, link);
+}
+
+
static int do_writable(pn_link_t *pn_link)
{
qd_link_t *link = (qd_link_t*) pn_link_get_context(pn_link);
@@ -361,7 +371,8 @@ static int process_handler(qd_container_
setup_outgoing_link(container, pn_link);
else
setup_incoming_link(container, pn_link);
- }
+ } else if (pn_link_state(pn_link) & PN_LOCAL_ACTIVE)
+ handle_link_open(container, pn_link);
break;
case PN_LINK_REMOTE_CLOSE :
Modified: qpid/dispatch/trunk/src/router_node.c
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/router_node.c?rev=1657699&r1=1657698&r2=1657699&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/router_node.c (original)
+++ qpid/dispatch/trunk/src/router_node.c Thu Feb 5 21:46:28 2015
@@ -268,6 +268,33 @@ static int qd_router_terminus_is_router(
/**
+ * If the terminus has a dynamic-node-property for a node address,
+ * return an interator for the content of that property.
+ */
+static const char *qd_router_terminus_dnp_address(pn_terminus_t *term)
+{
+ pn_data_t *props = pn_terminus_properties(term);
+
+ if (!props)
+ return 0;
+
+ pn_data_rewind(props);
+ if (pn_data_next(props) && pn_data_enter(props) && pn_data_next(props)) {
+ pn_bytes_t sym = pn_data_get_symbol(props);
+ if (sym.start && strcmp(QD_DYNAMIC_NODE_PROPERTY_ADDRESS, sym.start)
== 0) {
+ if (pn_data_next(props)) {
+ pn_bytes_t val = pn_data_get_string(props);
+ if (val.start && *val.start != '\0')
+ return val.start;
+ }
+ }
+ }
+
+ return 0;
+}
+
+
+/**
* Generate a temporary routable address for a destination connected to this
* router node.
*/
@@ -775,6 +802,7 @@ static void router_rx_handler(void* cont
sys_mutex_lock(router->lock);
qd_router_link_t *clink = rlink->connected_link;
if (clink) {
+ pn_link_flow(pn_link, 1); // TODO - remove this when link-propagation
is complete
router_do_link_route_LH(clink, delivery, msg);
sys_mutex_unlock(router->lock);
return;
@@ -1035,13 +1063,13 @@ ALLOC_DECLARE(link_attach_t);
ALLOC_DEFINE(link_attach_t);
-typedef struct link_detach_t {
+typedef struct link_event_t {
qd_router_t *router;
qd_router_link_t *rlink;
-} link_detach_t;
+} link_event_t;
-ALLOC_DECLARE(link_detach_t);
-ALLOC_DEFINE(link_detach_t);
+ALLOC_DECLARE(link_event_t);
+ALLOC_DEFINE(link_event_t);
typedef enum {
@@ -1094,26 +1122,45 @@ static void qd_router_attach_routed_link
static void qd_router_detach_routed_link(void *context, bool discard)
{
- link_detach_t *ld = (link_detach_t*) context;
+ link_event_t *le = (link_event_t*) context;
if (!discard) {
- qd_link_t *link = ld->rlink->link;
+ qd_link_t *link = le->rlink->link;
qd_link_close(link);
- sys_mutex_lock(ld->router->lock);
- qd_entity_cache_remove(QD_ROUTER_LINK_TYPE, ld->rlink);
- DEQ_REMOVE(ld->router->links, ld->rlink);
- sys_mutex_unlock(ld->router->lock);
+ sys_mutex_lock(le->router->lock);
+ qd_entity_cache_remove(QD_ROUTER_LINK_TYPE, le->rlink);
+ DEQ_REMOVE(le->router->links, le->rlink);
+ sys_mutex_unlock(le->router->lock);
}
- free_link_detach_t(ld);
+ free_link_event_t(le);
}
-link_attach_result_t qd_router_link_route(qd_router_t *router,
- qd_router_link_t *rlink,
- const char *term_addr,
- qd_direction_t dir)
+static void qd_router_open_routed_link(void *context, bool discard)
+{
+ link_event_t *le = (link_event_t*) context;
+
+ if (!discard) {
+ qd_link_t *link = le->rlink->link;
+
+ if (le->rlink->connected_link) {
+ qd_link_t *peer = le->rlink->connected_link->link;
+ pn_terminus_copy(qd_link_source(link),
qd_link_remote_source(peer));
+ pn_terminus_copy(qd_link_target(link),
qd_link_remote_target(peer));
+ pn_link_open(qd_link_pn(link));
+ }
+ }
+
+ free_link_event_t(le);
+}
+
+
+link_attach_result_t qd_router_link_route_LH(qd_router_t *router,
+ qd_router_link_t *rlink,
+ const char *term_addr,
+ qd_direction_t dir)
{
//
// Lookup the target address to see if we can link-route this attach.
@@ -1218,22 +1265,37 @@ static int router_incoming_link_handler(
//
link_attach_result_t la_result = LINK_ATTACH_NO_MATCH;
if (!is_router)
- la_result = qd_router_link_route(router, rlink, r_tgt, QD_OUTGOING);
+ la_result = qd_router_link_route_LH(router, rlink, r_tgt, QD_OUTGOING);
sys_mutex_unlock(router->lock);
pn_terminus_copy(qd_link_source(link), qd_link_remote_source(link));
pn_terminus_copy(qd_link_target(link), qd_link_remote_target(link));
- //
- // If link-routing was successful or there was no matching link-route
- // address, open the link. If link-routing was supposed to work but
- // there was no reachable destination, close the link.
- //
- if (la_result == LINK_ATTACH_NO_PATH)
- pn_link_close(pn_link);
- else {
+ switch (la_result) {
+ case LINK_ATTACH_NO_MATCH:
+ //
+ // We didn't link-route this attach. It terminates here.
+ // Open it in the reverse direction.
+ //
pn_link_flow(pn_link, 1000);
pn_link_open(pn_link);
+ break;
+
+ case LINK_ATTACH_NO_PATH:
+ //
+ // The link should be routable but there is no path to the
+ // destination. Close the link.
+ //
+ pn_link_close(pn_link);
+ break;
+
+ case LINK_ATTACH_FORWARDED:
+ //
+ // We routed the attach outbound. Don't open the link back until
+ // the downstream link is opened.
+ //
+ pn_link_flow(pn_link, 1000); // TODO - remove this when flow
propagation is complete
+ break;
}
return 0;
@@ -1342,7 +1404,11 @@ static int router_outgoing_link_handler(
// address, that address needs to be set up in the address list.
//
char temp_addr[1000]; // TODO: Use pn_string or aprintf.
- la_result = qd_router_link_route(router, rlink, r_src, QD_INCOMING);
+ const char *link_route_address =
qd_router_terminus_dnp_address(qd_link_remote_source(link));
+
+ if (link_route_address == 0)
+ link_route_address = r_src;
+ la_result = qd_router_link_route_LH(router, rlink, link_route_address,
QD_INCOMING);
if (la_result == LINK_ATTACH_NO_MATCH) {
if (is_dynamic) {
@@ -1390,7 +1456,55 @@ static int router_outgoing_link_handler(
if (iter)
qd_field_iterator_free(iter);
- pn_link_open(pn_link);
+
+ switch (la_result) {
+ case LINK_ATTACH_NO_MATCH:
+ //
+ // We didn't link-route this attach. It terminates here.
+ // Open it in the reverse direction.
+ //
+ pn_link_open(pn_link);
+ break;
+
+ case LINK_ATTACH_NO_PATH:
+ //
+ // The link should be routable but there is no path to the
+ // destination. Close the link.
+ //
+ pn_link_close(pn_link);
+ break;
+
+ case LINK_ATTACH_FORWARDED:
+ //
+ // We routed the attach outbound. Don't open the link back until
+ // the downstream link is opened.
+ //
+ break;
+ }
+
+ return 0;
+}
+
+
+/**
+ * Handler for remote opening of links that we initiated.
+ */
+static int router_link_attach_handler(void* context, qd_link_t *link)
+{
+ qd_router_t *router = (qd_router_t*) context;
+ qd_router_link_t *rlink = (qd_router_link_t*)
qd_link_get_context(link);
+ qd_router_link_t *peer_rlink = rlink->connected_link;
+
+ if (peer_rlink) {
+ qd_connection_t *out_conn = qd_link_connection(peer_rlink->link);
+ link_event_t *le = new_link_event_t();
+
+ le->router = router;
+ le->rlink = peer_rlink;
+
+ qd_connection_invoke_deferred(out_conn, qd_router_open_routed_link,
le);
+ }
+
return 0;
}
@@ -1413,11 +1527,11 @@ static int router_link_detach_handler(vo
if (rlink->connected_link) {
qd_connection_t *out_conn =
qd_link_connection(rlink->connected_link->link);
- link_detach_t *ld = new_link_detach_t();
- ld->router = router;
- ld->rlink = rlink->connected_link;
+ link_event_t *le = new_link_event_t();
+ le->router = router;
+ le->rlink = rlink->connected_link;
rlink->connected_link->connected_link = 0;
- qd_connection_invoke_deferred(out_conn, qd_router_detach_routed_link,
ld);
+ qd_connection_invoke_deferred(out_conn, qd_router_detach_routed_link,
le);
}
//
@@ -1615,6 +1729,7 @@ static qd_node_type_t router_node = {"ro
router_outgoing_link_handler,
router_writable_link_handler,
router_link_detach_handler,
+ router_link_attach_handler,
0, // node_created_handler
0, // node_destroyed_handler
router_inbound_open_handler,
Modified: qpid/dispatch/trunk/tests/config-2-broker/A.conf
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/tests/config-2-broker/A.conf?rev=1657699&r1=1657698&r2=1657699&view=diff
==============================================================================
--- qpid/dispatch/trunk/tests/config-2-broker/A.conf (original)
+++ qpid/dispatch/trunk/tests/config-2-broker/A.conf Thu Feb 5 21:46:28 2015
@@ -67,7 +67,7 @@ router {
connector {
name: broker
role: on-demand
- addr: 0.0.0.0
+ addr: 127.0.0.1
port: 11000
sasl-mechanisms: ANONYMOUS
}
@@ -82,6 +82,11 @@ linkRoutePattern {
connector: broker
}
+linkRoutePattern {
+ prefix: qmf.
+ connector: broker
+}
+
fixed-address {
prefix: /closest/
fanout: single
Modified: qpid/dispatch/trunk/tests/config-2-broker/B.conf
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/tests/config-2-broker/B.conf?rev=1657699&r1=1657698&r2=1657699&view=diff
==============================================================================
--- qpid/dispatch/trunk/tests/config-2-broker/B.conf (original)
+++ qpid/dispatch/trunk/tests/config-2-broker/B.conf Thu Feb 5 21:46:28 2015
@@ -54,7 +54,7 @@ listener {
connector {
role: inter-router
- addr: 0.0.0.0
+ addr: 127.0.0.1
port: 20102
sasl-mechanisms: ANONYMOUS
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]