DISPATCH-179 - Further cleanup.  Now ready to add the implementation of
               the first forwarder (multicast).


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/77281d62
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/77281d62
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/77281d62

Branch: refs/heads/tross-DISPATCH-179-1
Commit: 77281d629564a643ca23f7d72bb5f20744e2d65c
Parents: bc3ffca
Author: Ted Ross <[email protected]>
Authored: Wed Jan 6 17:26:10 2016 -0500
Committer: Ted Ross <[email protected]>
Committed: Wed Jan 6 17:26:10 2016 -0500

----------------------------------------------------------------------
 python/qpid_dispatch_internal/dispatch.py       |  2 +-
 .../qpid_dispatch_internal/management/agent.py  |  2 +-
 python/qpid_dispatch_internal/router/engine.py  |  1 +
 src/router_core/forwarder.c                     | 62 ++++++++++++--------
 src/router_core/management_agent.c              | 14 ++---
 src/router_core/route_tables.c                  |  2 +-
 src/router_core/router_core_private.h           | 18 +++---
 src/router_core/router_core_thread.c            |  2 +-
 src/router_core/transfer.c                      | 14 +----
 9 files changed, 61 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/77281d62/python/qpid_dispatch_internal/dispatch.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/dispatch.py 
b/python/qpid_dispatch_internal/dispatch.py
index 7c70a75..fdc6835 100644
--- a/python/qpid_dispatch_internal/dispatch.py
+++ b/python/qpid_dispatch_internal/dispatch.py
@@ -72,7 +72,7 @@ class QdDll(ctypes.PyDLL):
         self._prototype(self.qd_dispatch_router_unlock, None, 
[self.qd_dispatch_p])
 
         self._prototype(self.qd_connection_manager_start, None, 
[self.qd_dispatch_p])
-        self._prototype(self.qd_waypoint_activate_all, None, 
[self.qd_dispatch_p])
+        #self._prototype(self.qd_waypoint_activate_all, None, 
[self.qd_dispatch_p])
         self._prototype(self.qd_entity_refresh_begin, c_long, [py_object])
         self._prototype(self.qd_entity_refresh_end, None, [])
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/77281d62/python/qpid_dispatch_internal/management/agent.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/management/agent.py 
b/python/qpid_dispatch_internal/management/agent.py
index d1f9a0a..0750971 100644
--- a/python/qpid_dispatch_internal/management/agent.py
+++ b/python/qpid_dispatch_internal/management/agent.py
@@ -295,7 +295,7 @@ class FixedAddressEntity(EntityAdapter):
 class WaypointEntity(EntityAdapter):
     def create(self):
         self._qd.qd_dispatch_configure_waypoint(self._dispatch, self)
-        self._qd.qd_waypoint_activate_all(self._dispatch)
+        #self._qd.qd_waypoint_activate_all(self._dispatch)
 
 class LinkRoutePatternEntity(EntityAdapter):
     def create(self):

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/77281d62/python/qpid_dispatch_internal/router/engine.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/router/engine.py 
b/python/qpid_dispatch_internal/router/engine.py
index bda27aa..b3fa1a1 100644
--- a/python/qpid_dispatch_internal/router/engine.py
+++ b/python/qpid_dispatch_internal/router/engine.py
@@ -33,6 +33,7 @@ import time
 ## (i.e. we are in a test bench, etc.), load the stub versions.
 ##
 from ..dispatch import IoAdapter, LogAdapter, LOG_TRACE, LOG_INFO, LOG_ERROR, 
LOG_STACK_LIMIT
+from ..dispatch import SEMANTICS_MULTICAST_FLOOD, SEMANTICS_MULTICAST_ONCE
 
 class RouterEngine:
     """

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/77281d62/src/router_core/forwarder.c
----------------------------------------------------------------------
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index 1ab2e39..29d4c42 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -22,9 +22,12 @@
 #include <stdio.h>
 
 typedef void (*qdr_forward_message_t) (qdr_core_t      *core,
-                                       qdr_forwarder_t *forw,
+                                       qdr_address_t   *addr,
                                        qd_message_t    *msg,
-                                       qdr_delivery_t  *in_delivery);
+                                       qdr_delivery_t  *in_delivery,
+                                       bool             exclude_inprocess,
+                                       bool             control);
+
 typedef void (*qdr_forward_attach_t) (qdr_core_t      *core,
                                       qdr_forwarder_t *forw,
                                       qdr_link_t      *link);
@@ -39,33 +42,40 @@ struct qdr_forwarder_t {
 // Built-in Forwarders
 
//==================================================================================
 
-void qdr_forward_multicast(qdr_core_t      *core,
-                           qdr_forwarder_t *forw,
-                           qd_message_t    *msg,
-                           qdr_delivery_t  *in_delivery)
+void qdr_forward_multicast_CT(qdr_core_t      *core,
+                              qdr_address_t   *addr,
+                              qd_message_t    *msg,
+                              qdr_delivery_t  *in_delivery,
+                              bool             exclude_inprocess,
+                              bool             control)
 {
+    //bool bypass_valid_origins = addr->forwarder->bypass_valid_origins;
 }
 
 
-void qdr_forward_closest(qdr_core_t      *core,
-                         qdr_forwarder_t *forw,
-                         qd_message_t    *msg,
-                         qdr_delivery_t  *in_delivery)
+void qdr_forward_closest_CT(qdr_core_t      *core,
+                            qdr_address_t   *addr,
+                            qd_message_t    *msg,
+                            qdr_delivery_t  *in_delivery,
+                            bool             exclude_inprocess,
+                            bool             control)
 {
 }
 
 
-void qdr_forward_balanced(qdr_core_t      *core,
-                          qdr_forwarder_t *forw,
-                          qd_message_t    *msg,
-                          qdr_delivery_t  *in_delivery)
+void qdr_forward_balanced_CT(qdr_core_t      *core,
+                             qdr_address_t   *addr,
+                             qd_message_t    *msg,
+                             qdr_delivery_t  *in_delivery,
+                             bool             exclude_inprocess,
+                             bool             control)
 {
 }
 
 
-void qdr_forward_link_balanced(qdr_core_t      *core,
-                               qdr_forwarder_t *forw,
-                               qdr_link_t      *link)
+void qdr_forward_link_balanced_CT(qdr_core_t      *core,
+                                  qdr_forwarder_t *forw,
+                                  qdr_link_t      *link)
 {
 }
 
@@ -91,15 +101,15 @@ void qdr_forwarder_setup_CT(qdr_core_t *core)
     //
     // Create message forwarders
     //
-    core->forwarders[QD_SEMANTICS_MULTICAST_FLOOD]  = 
qdr_new_forwarder(qdr_forward_multicast, 0, true);
-    core->forwarders[QD_SEMANTICS_MULTICAST_ONCE]   = 
qdr_new_forwarder(qdr_forward_multicast, 0, false);
-    core->forwarders[QD_SEMANTICS_ANYCAST_CLOSEST]  = 
qdr_new_forwarder(qdr_forward_closest,   0, false);
-    core->forwarders[QD_SEMANTICS_ANYCAST_BALANCED] = 
qdr_new_forwarder(qdr_forward_balanced,  0, false);
+    core->forwarders[QD_SEMANTICS_MULTICAST_FLOOD]  = 
qdr_new_forwarder(qdr_forward_multicast_CT, 0, true);
+    core->forwarders[QD_SEMANTICS_MULTICAST_ONCE]   = 
qdr_new_forwarder(qdr_forward_multicast_CT, 0, false);
+    core->forwarders[QD_SEMANTICS_ANYCAST_CLOSEST]  = 
qdr_new_forwarder(qdr_forward_closest_CT,   0, false);
+    core->forwarders[QD_SEMANTICS_ANYCAST_BALANCED] = 
qdr_new_forwarder(qdr_forward_balanced_CT,  0, false);
 
     //
     // Create link forwarders
     //
-    core->forwarders[QD_SEMANTICS_LINK_BALANCED] = qdr_new_forwarder(0, 
qdr_forward_link_balanced, false);
+    core->forwarders[QD_SEMANTICS_LINK_BALANCED] = qdr_new_forwarder(0, 
qdr_forward_link_balanced_CT, false);
 }
 
 
@@ -111,8 +121,14 @@ qdr_forwarder_t *qdr_forwarder_CT(qdr_core_t *core, 
qd_address_semantics_t seman
 }
 
 
-void qdr_forward_message_CT(qdr_core_t *core, qdr_forwarder_t *forwarder, 
qd_message_t *msg, qdr_delivery_t *in_delivery)
+void qdr_forward_message_CT(qdr_core_t *core, qdr_address_t *addr, 
qd_message_t *msg, qdr_delivery_t *in_delivery,
+                            bool exclude_inprocess, bool control)
 {
+    if (addr->forwarder)
+        addr->forwarder->forward_message(core, addr, msg, in_delivery, 
exclude_inprocess, control);
+    else {
+        // TODO - Deal with this delivery's disposition
+    }
 }
 
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/77281d62/src/router_core/management_agent.c
----------------------------------------------------------------------
diff --git a/src/router_core/management_agent.c 
b/src/router_core/management_agent.c
index 3aae26a..4ab7db1 100644
--- a/src/router_core/management_agent.c
+++ b/src/router_core/management_agent.c
@@ -148,7 +148,7 @@ static void qd_set_properties(qd_message_t        *msg,
 }
 
 
-static void qd_manage_response_handler (void *context, const qd_amqp_error_t 
*status, bool more)
+static void qd_manage_response_handler(void *context, const qd_amqp_error_t 
*status, bool more)
 {
     qd_management_context_t *ctx = (qd_management_context_t*) context;
 
@@ -313,13 +313,13 @@ static void qd_core_agent_delete_handler(qdr_core_t       
          *core,
 /**
  * Checks the content of the message to see if this can be handled by this 
agent.
  */
-static bool qd_can_handle_request(qd_field_iterator_t        *props,
-                                  qd_router_entity_type_t    *entity_type,
-                                  qd_router_operation_type_t *operation_type,
+static bool qd_can_handle_request(qd_field_iterator_t         *props,
+                                  qd_router_entity_type_t     *entity_type,
+                                  qd_router_operation_type_t  *operation_type,
                                   qd_field_iterator_t        **identity_iter,
                                   qd_field_iterator_t        **name_iter,
-                                  int                        *count,
-                                  int                        *offset)
+                                  int                         *count,
+                                  int                         *offset)
 {
     qd_parsed_field_t *fld = qd_parse(props);
 
@@ -407,7 +407,7 @@ static bool qd_can_handle_request(qd_field_iterator_t       
 *props,
  * Handler for the management agent.
  *
  */
-void management_agent_handler(void *context, qd_message_t *msg, int link_id)
+void management_agent_handler(void *context, qd_message_t *msg, int 
unused_link_id)
 {
     qdr_core_t *core = (qdr_core_t*) context;
     qd_field_iterator_t *app_properties_iter = qd_message_field_iterator(msg, 
QD_FIELD_APPLICATION_PROPERTIES);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/77281d62/src/router_core/route_tables.c
----------------------------------------------------------------------
diff --git a/src/router_core/route_tables.c b/src/router_core/route_tables.c
index 57d9427..4216498 100644
--- a/src/router_core/route_tables.c
+++ b/src/router_core/route_tables.c
@@ -560,7 +560,7 @@ static void qdr_subscribe_CT(qdr_core_t *core, qdr_action_t 
*action, bool discar
         qdr_address_t *addr = 0;
 
         char *astring = (char*) qd_field_iterator_copy(address->iterator);
-        qd_log(core->log, QD_LOG_INFO, "Subscribed address=%s class=%c", 
astring, aclass);
+        qd_log(core->log, QD_LOG_INFO, "In-process subscription %c/%s", 
aclass, astring);
         free(astring);
 
         qd_address_iterator_override_prefix(address->iterator, aclass);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/77281d62/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h 
b/src/router_core/router_core_private.h
index 217c333..bc94b68 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -25,10 +25,17 @@
 #include <qpid/dispatch/log.h>
 #include <memory.h>
 
-typedef struct qdr_forwarder_t qdr_forwarder_t;
+typedef struct qdr_address_t     qdr_address_t;
+typedef struct qdr_node_t        qdr_node_t;
+typedef struct qdr_router_ref_t  qdr_router_ref_t;
+typedef struct qdr_link_ref_t    qdr_link_ref_t;
+typedef struct qdr_lrp_t         qdr_lrp_t;
+typedef struct qdr_lrp_ref_t     qdr_lrp_ref_t;
+typedef struct qdr_forwarder_t   qdr_forwarder_t;
 
 qdr_forwarder_t *qdr_forwarder_CT(qdr_core_t *core, qd_address_semantics_t 
semantics);
-void qdr_forward_message_CT(qdr_core_t *core, qdr_forwarder_t *forwarder, 
qd_message_t *msg, qdr_delivery_t *in_delivery);
+void qdr_forward_message_CT(qdr_core_t *core, qdr_address_t *addr, 
qd_message_t *msg, qdr_delivery_t *in_delivery,
+                            bool exclude_inprocess, bool control);
 void qdr_forward_attach_CT(qdr_core_t *core, qdr_forwarder_t *forwarder, 
qdr_link_t *in_link);
 
 /**
@@ -132,13 +139,6 @@ ALLOC_DECLARE(qdr_query_t);
 DEQ_DECLARE(qdr_query_t, qdr_query_list_t); 
 
 
-typedef struct qdr_address_t     qdr_address_t;
-typedef struct qdr_node_t        qdr_node_t;
-typedef struct qdr_router_ref_t  qdr_router_ref_t;
-typedef struct qdr_link_ref_t    qdr_link_ref_t;
-typedef struct qdr_lrp_t         qdr_lrp_t;
-typedef struct qdr_lrp_ref_t     qdr_lrp_ref_t;
-
 struct qdr_node_t {
     DEQ_LINKS(qdr_node_t);
     qdr_address_t    *owning_addr;

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/77281d62/src/router_core/router_core_thread.c
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_thread.c 
b/src/router_core/router_core_thread.c
index 28e979f..b359602 100644
--- a/src/router_core/router_core_thread.c
+++ b/src/router_core/router_core_thread.c
@@ -36,9 +36,9 @@ void *router_core_thread(void *arg)
     qdr_action_list_t  action_list;
     qdr_action_t      *action;
 
+    qdr_forwarder_setup_CT(core);
     qdr_route_table_setup_CT(core);
     qdr_agent_setup_CT(core);
-    qdr_forwarder_setup_CT(core);
 
     qd_log(core->log, QD_LOG_INFO, "Router Core thread running. %s/%s", 
core->router_area, core->router_id);
     while (core->running) {

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/77281d62/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 0a2ae66..48d3446 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -105,18 +105,6 @@ void *qdr_delivery_get_context(qdr_delivery_t *delivery)
 // In-Thread Functions
 
//==================================================================================
 
-static void qdr_route_message_CT(qdr_core_t     *core,
-                                 qdr_address_t  *addr,
-                                 qd_message_t   *msg,
-                                 qdr_delivery_t *dlv,
-                                 bool            exclude_inprocess,
-                                 bool            control)
-{
-    const char *key = (const char*) qd_hash_key_by_handle(addr->hash_handle);
-    printf("qdr_route_message_CT - %s, %s\n", key, exclude_inprocess ? "yes" : 
"no");
-}
-
-
 static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool 
discard)
 {
     if (discard)
@@ -141,7 +129,7 @@ static void qdr_send_to_CT(qdr_core_t *core, qdr_action_t 
*action, bool discard)
         qd_address_iterator_reset_view(addr_field->iterator, 
ITER_VIEW_ADDRESS_HASH);
         qd_hash_retrieve(core->addr_hash, addr_field->iterator, (void**) 
&addr);
         if (addr)
-            qdr_route_message_CT(core, addr, msg, 0, 
action->args.io.exclude_inprocess, action->args.io.control);
+            qdr_forward_message_CT(core, addr, msg, 0, 
action->args.io.exclude_inprocess, action->args.io.control);
         else
             qd_log(core->log, QD_LOG_DEBUG, "In-process send to an unknown 
address");
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to