Author: tross
Date: Thu Feb  5 21:46:28 2015
New Revision: 1657699

URL: http://svn.apache.org/r1657699
Log:
DISPATCH-6 - Added dynamic-source link-routing.
    This feature uses the x-opt-qd.address option in dynamic-node-properties
    to determine how to route a listener with a dynamic source.

Modified:
    qpid/dispatch/trunk/include/qpid/dispatch/amqp.h
    qpid/dispatch/trunk/include/qpid/dispatch/container.h
    qpid/dispatch/trunk/include/qpid/dispatch/iterator.h
    qpid/dispatch/trunk/src/amqp.c
    qpid/dispatch/trunk/src/container.c
    qpid/dispatch/trunk/src/router_node.c
    qpid/dispatch/trunk/tests/config-2-broker/A.conf
    qpid/dispatch/trunk/tests/config-2-broker/B.conf

Modified: qpid/dispatch/trunk/include/qpid/dispatch/amqp.h
URL: 
http://svn.apache.org/viewvc/qpid/dispatch/trunk/include/qpid/dispatch/amqp.h?rev=1657699&r1=1657698&r2=1657699&view=diff
==============================================================================
--- qpid/dispatch/trunk/include/qpid/dispatch/amqp.h (original)
+++ qpid/dispatch/trunk/include/qpid/dispatch/amqp.h Thu Feb  5 21:46:28 2015
@@ -107,6 +107,11 @@ const char * const QD_CAPABILITY_ANONYMO
 const char * const QD_CAPABILITY_ROUTER;
 /// @}
 
+/** @name Dynamic Node Properties */
+/// @{
+const char * const QD_DYNAMIC_NODE_PROPERTY_ADDRESS;  ///< Address for routing 
dynamic sources
+/// @}
+
 /** @name Miscellaneous Strings */
 /// @{
 const char * const QD_INTERNODE_LINK_NAME_1;

Modified: qpid/dispatch/trunk/include/qpid/dispatch/container.h
URL: 
http://svn.apache.org/viewvc/qpid/dispatch/trunk/include/qpid/dispatch/container.h?rev=1657699&r1=1657698&r2=1657699&view=diff
==============================================================================
--- qpid/dispatch/trunk/include/qpid/dispatch/container.h (original)
+++ qpid/dispatch/trunk/include/qpid/dispatch/container.h Thu Feb  5 21:46:28 
2015
@@ -106,6 +106,9 @@ typedef struct {
     qd_container_link_detach_handler_t link_detach_handler;
     ///@}
 
+    /** Invoked when a link we created was opened by the peer */
+    qd_container_link_handler_t link_attach_handler;
+
     /** @name Node-Type Handlers
      * @{
      */

Modified: qpid/dispatch/trunk/include/qpid/dispatch/iterator.h
URL: 
http://svn.apache.org/viewvc/qpid/dispatch/trunk/include/qpid/dispatch/iterator.h?rev=1657699&r1=1657698&r2=1657699&view=diff
==============================================================================
--- qpid/dispatch/trunk/include/qpid/dispatch/iterator.h (original)
+++ qpid/dispatch/trunk/include/qpid/dispatch/iterator.h Thu Feb  5 21:46:28 
2015
@@ -83,7 +83,7 @@ typedef struct qd_field_iterator_t qd_fi
  *     amqp:/_topo/all/all/<local>
  *                        L^^^^^^^
  *     amqp:/<mobile>
- *          M^^^^^^^^
+ *         M0^^^^^^^^
  *
  * ITER_VIEW_NODE_HASH - Isolate the hashable part of a router-id, used for 
headers
  *

Modified: qpid/dispatch/trunk/src/amqp.c
URL: 
http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/amqp.c?rev=1657699&r1=1657698&r2=1657699&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/amqp.c (original)
+++ qpid/dispatch/trunk/src/amqp.c Thu Feb  5 21:46:28 2015
@@ -27,6 +27,8 @@ const char * const QD_MA_CLASS   = "x-op
 const char * const QD_CAPABILITY_ROUTER          = "qd.router";
 const char * const QD_CAPABILITY_ANONYMOUS_RELAY = "ANONYMOUS-RELAY";
 
+const char * const QD_DYNAMIC_NODE_PROPERTY_ADDRESS = "x-opt-qd.address";
+
 const char * const QD_INTERNODE_LINK_NAME_1 = "qd.internode.1";
 const char * const QD_INTERNODE_LINK_NAME_2 = "qd.internode.2";
 

Modified: qpid/dispatch/trunk/src/container.c
URL: 
http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/container.c?rev=1657699&r1=1657698&r2=1657699&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/container.c (original)
+++ qpid/dispatch/trunk/src/container.c Thu Feb  5 21:46:28 2015
@@ -184,6 +184,16 @@ static void setup_incoming_link(qd_conta
 }
 
 
+static void handle_link_open(qd_container_t *container, pn_link_t *pn_link)
+{
+    qd_link_t *link = (qd_link_t*) pn_link_get_context(pn_link);
+    if (link == 0)
+        return;
+    if (link->node->ntype->link_attach_handler)
+        link->node->ntype->link_attach_handler(link->node->context, link);
+}
+
+
 static int do_writable(pn_link_t *pn_link)
 {
     qd_link_t *link = (qd_link_t*) pn_link_get_context(pn_link);
@@ -361,7 +371,8 @@ static int process_handler(qd_container_
                     setup_outgoing_link(container, pn_link);
                 else
                     setup_incoming_link(container, pn_link);
-            }
+            } else if (pn_link_state(pn_link) & PN_LOCAL_ACTIVE)
+                handle_link_open(container, pn_link);
             break;
 
         case PN_LINK_REMOTE_CLOSE :

Modified: qpid/dispatch/trunk/src/router_node.c
URL: 
http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/router_node.c?rev=1657699&r1=1657698&r2=1657699&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/router_node.c (original)
+++ qpid/dispatch/trunk/src/router_node.c Thu Feb  5 21:46:28 2015
@@ -268,6 +268,33 @@ static int qd_router_terminus_is_router(
 
 
 /**
+ * If the terminus has a dynamic-node-property for a node address,
+ * return an interator for the content of that property.
+ */
+static const char *qd_router_terminus_dnp_address(pn_terminus_t *term)
+{
+    pn_data_t *props = pn_terminus_properties(term);
+
+    if (!props)
+        return 0;
+
+    pn_data_rewind(props);
+    if (pn_data_next(props) && pn_data_enter(props) && pn_data_next(props)) {
+        pn_bytes_t sym = pn_data_get_symbol(props);
+        if (sym.start && strcmp(QD_DYNAMIC_NODE_PROPERTY_ADDRESS, sym.start) 
== 0) {
+            if (pn_data_next(props)) {
+                pn_bytes_t val = pn_data_get_string(props);
+                if (val.start && *val.start != '\0')
+                    return val.start;
+            }
+        }
+    }
+
+    return 0;
+}
+
+
+/**
  * Generate a temporary routable address for a destination connected to this
  * router node.
  */
@@ -775,6 +802,7 @@ static void router_rx_handler(void* cont
     sys_mutex_lock(router->lock);
     qd_router_link_t *clink = rlink->connected_link;
     if (clink) {
+        pn_link_flow(pn_link, 1); // TODO - remove this when link-propagation 
is complete
         router_do_link_route_LH(clink, delivery, msg);
         sys_mutex_unlock(router->lock);
         return;
@@ -1035,13 +1063,13 @@ ALLOC_DECLARE(link_attach_t);
 ALLOC_DEFINE(link_attach_t);
 
 
-typedef struct link_detach_t {
+typedef struct link_event_t {
     qd_router_t      *router;
     qd_router_link_t *rlink;
-} link_detach_t;
+} link_event_t;
 
-ALLOC_DECLARE(link_detach_t);
-ALLOC_DEFINE(link_detach_t);
+ALLOC_DECLARE(link_event_t);
+ALLOC_DEFINE(link_event_t);
 
 
 typedef enum {
@@ -1094,26 +1122,45 @@ static void qd_router_attach_routed_link
 
 static void qd_router_detach_routed_link(void *context, bool discard)
 {
-    link_detach_t *ld = (link_detach_t*) context;
+    link_event_t *le = (link_event_t*) context;
 
     if (!discard) {
-        qd_link_t *link = ld->rlink->link;
+        qd_link_t *link = le->rlink->link;
         qd_link_close(link);
 
-        sys_mutex_lock(ld->router->lock);
-        qd_entity_cache_remove(QD_ROUTER_LINK_TYPE, ld->rlink);
-        DEQ_REMOVE(ld->router->links, ld->rlink);
-        sys_mutex_unlock(ld->router->lock);
+        sys_mutex_lock(le->router->lock);
+        qd_entity_cache_remove(QD_ROUTER_LINK_TYPE, le->rlink);
+        DEQ_REMOVE(le->router->links, le->rlink);
+        sys_mutex_unlock(le->router->lock);
     }
 
-    free_link_detach_t(ld);
+    free_link_event_t(le);
 }
 
 
-link_attach_result_t qd_router_link_route(qd_router_t      *router,
-                                          qd_router_link_t *rlink,
-                                          const char       *term_addr,
-                                          qd_direction_t    dir)
+static void qd_router_open_routed_link(void *context, bool discard)
+{
+    link_event_t *le = (link_event_t*) context;
+
+    if (!discard) {
+        qd_link_t *link = le->rlink->link;
+
+        if (le->rlink->connected_link) {
+            qd_link_t *peer = le->rlink->connected_link->link;
+            pn_terminus_copy(qd_link_source(link), 
qd_link_remote_source(peer));
+            pn_terminus_copy(qd_link_target(link), 
qd_link_remote_target(peer));
+            pn_link_open(qd_link_pn(link));
+        }
+    }
+
+    free_link_event_t(le);
+}
+
+
+link_attach_result_t qd_router_link_route_LH(qd_router_t      *router,
+                                             qd_router_link_t *rlink,
+                                             const char       *term_addr,
+                                             qd_direction_t    dir)
 {
     //
     // Lookup the target address to see if we can link-route this attach.
@@ -1218,22 +1265,37 @@ static int router_incoming_link_handler(
     //
     link_attach_result_t la_result = LINK_ATTACH_NO_MATCH;
     if (!is_router)
-        la_result = qd_router_link_route(router, rlink, r_tgt, QD_OUTGOING);
+        la_result = qd_router_link_route_LH(router, rlink, r_tgt, QD_OUTGOING);
     sys_mutex_unlock(router->lock);
 
     pn_terminus_copy(qd_link_source(link), qd_link_remote_source(link));
     pn_terminus_copy(qd_link_target(link), qd_link_remote_target(link));
 
-    //
-    // If link-routing was successful or there was no matching link-route
-    // address, open the link.  If link-routing was supposed to work but
-    // there was no reachable destination, close the link.
-    //
-    if (la_result == LINK_ATTACH_NO_PATH)
-        pn_link_close(pn_link);
-    else {
+    switch (la_result) {
+    case LINK_ATTACH_NO_MATCH:
+        //
+        // We didn't link-route this attach.  It terminates here.
+        // Open it in the reverse direction.
+        //
         pn_link_flow(pn_link, 1000);
         pn_link_open(pn_link);
+        break;
+
+    case LINK_ATTACH_NO_PATH:
+        //
+        // The link should be routable but there is no path to the
+        // destination.  Close the link.
+        //
+        pn_link_close(pn_link);
+        break;
+
+    case LINK_ATTACH_FORWARDED:
+        //
+        // We routed the attach outbound.  Don't open the link back until
+        // the downstream link is opened.
+        //
+        pn_link_flow(pn_link, 1000);  // TODO - remove this when flow 
propagation is complete
+        break;
     }
 
     return 0;
@@ -1342,7 +1404,11 @@ static int router_outgoing_link_handler(
         // address, that address needs to be set up in the address list.
         //
         char temp_addr[1000]; // TODO: Use pn_string or aprintf.
-        la_result = qd_router_link_route(router, rlink, r_src, QD_INCOMING);
+        const char *link_route_address = 
qd_router_terminus_dnp_address(qd_link_remote_source(link));
+
+        if (link_route_address == 0)
+            link_route_address = r_src;
+        la_result = qd_router_link_route_LH(router, rlink, link_route_address, 
QD_INCOMING);
 
         if (la_result == LINK_ATTACH_NO_MATCH) {
             if (is_dynamic) {
@@ -1390,7 +1456,55 @@ static int router_outgoing_link_handler(
 
     if (iter)
         qd_field_iterator_free(iter);
-    pn_link_open(pn_link);
+
+    switch (la_result) {
+    case LINK_ATTACH_NO_MATCH:
+        //
+        // We didn't link-route this attach.  It terminates here.
+        // Open it in the reverse direction.
+        //
+        pn_link_open(pn_link);
+        break;
+
+    case LINK_ATTACH_NO_PATH:
+        //
+        // The link should be routable but there is no path to the
+        // destination.  Close the link.
+        //
+        pn_link_close(pn_link);
+        break;
+
+    case LINK_ATTACH_FORWARDED:
+        //
+        // We routed the attach outbound.  Don't open the link back until
+        // the downstream link is opened.
+        //
+        break;
+    }
+
+    return 0;
+}
+
+
+/**
+ * Handler for remote opening of links that we initiated.
+ */
+static int router_link_attach_handler(void* context, qd_link_t *link)
+{
+    qd_router_t      *router     = (qd_router_t*) context;
+    qd_router_link_t *rlink      = (qd_router_link_t*) 
qd_link_get_context(link);
+    qd_router_link_t *peer_rlink = rlink->connected_link;
+
+    if (peer_rlink) {
+        qd_connection_t *out_conn = qd_link_connection(peer_rlink->link);
+        link_event_t    *le       = new_link_event_t();
+
+        le->router = router;
+        le->rlink  = peer_rlink;
+
+        qd_connection_invoke_deferred(out_conn, qd_router_open_routed_link, 
le);
+    }
+    
     return 0;
 }
 
@@ -1413,11 +1527,11 @@ static int router_link_detach_handler(vo
 
     if (rlink->connected_link) {
         qd_connection_t *out_conn = 
qd_link_connection(rlink->connected_link->link);
-        link_detach_t   *ld       = new_link_detach_t();
-        ld->router  = router;
-        ld->rlink   = rlink->connected_link;
+        link_event_t    *le       = new_link_event_t();
+        le->router  = router;
+        le->rlink   = rlink->connected_link;
         rlink->connected_link->connected_link = 0;
-        qd_connection_invoke_deferred(out_conn, qd_router_detach_routed_link, 
ld);
+        qd_connection_invoke_deferred(out_conn, qd_router_detach_routed_link, 
le);
     }
 
     //
@@ -1615,6 +1729,7 @@ static qd_node_type_t router_node = {"ro
                                      router_outgoing_link_handler,
                                      router_writable_link_handler,
                                      router_link_detach_handler,
+                                     router_link_attach_handler,
                                      0,   // node_created_handler
                                      0,   // node_destroyed_handler
                                      router_inbound_open_handler,

Modified: qpid/dispatch/trunk/tests/config-2-broker/A.conf
URL: 
http://svn.apache.org/viewvc/qpid/dispatch/trunk/tests/config-2-broker/A.conf?rev=1657699&r1=1657698&r2=1657699&view=diff
==============================================================================
--- qpid/dispatch/trunk/tests/config-2-broker/A.conf (original)
+++ qpid/dispatch/trunk/tests/config-2-broker/A.conf Thu Feb  5 21:46:28 2015
@@ -67,7 +67,7 @@ router {
 connector {
     name: broker
     role: on-demand
-    addr: 0.0.0.0
+    addr: 127.0.0.1
     port: 11000
     sasl-mechanisms: ANONYMOUS
 }
@@ -82,6 +82,11 @@ linkRoutePattern {
     connector: broker
 }
 
+linkRoutePattern {
+    prefix: qmf.
+    connector: broker
+}
+
 fixed-address {
     prefix: /closest/
     fanout: single

Modified: qpid/dispatch/trunk/tests/config-2-broker/B.conf
URL: 
http://svn.apache.org/viewvc/qpid/dispatch/trunk/tests/config-2-broker/B.conf?rev=1657699&r1=1657698&r2=1657699&view=diff
==============================================================================
--- qpid/dispatch/trunk/tests/config-2-broker/B.conf (original)
+++ qpid/dispatch/trunk/tests/config-2-broker/B.conf Thu Feb  5 21:46:28 2015
@@ -54,7 +54,7 @@ listener {
 
 connector {
     role: inter-router
-    addr: 0.0.0.0
+    addr: 127.0.0.1
     port: 20102
     sasl-mechanisms: ANONYMOUS
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to