DISPATCH-179 - Further cleanup. Now ready to add the implementation of
the first forwarder (multicast).
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/77281d62
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/77281d62
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/77281d62
Branch: refs/heads/tross-DISPATCH-179-1
Commit: 77281d629564a643ca23f7d72bb5f20744e2d65c
Parents: bc3ffca
Author: Ted Ross <[email protected]>
Authored: Wed Jan 6 17:26:10 2016 -0500
Committer: Ted Ross <[email protected]>
Committed: Wed Jan 6 17:26:10 2016 -0500
----------------------------------------------------------------------
python/qpid_dispatch_internal/dispatch.py | 2 +-
.../qpid_dispatch_internal/management/agent.py | 2 +-
python/qpid_dispatch_internal/router/engine.py | 1 +
src/router_core/forwarder.c | 62 ++++++++++++--------
src/router_core/management_agent.c | 14 ++---
src/router_core/route_tables.c | 2 +-
src/router_core/router_core_private.h | 18 +++---
src/router_core/router_core_thread.c | 2 +-
src/router_core/transfer.c | 14 +----
9 files changed, 61 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/77281d62/python/qpid_dispatch_internal/dispatch.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/dispatch.py
b/python/qpid_dispatch_internal/dispatch.py
index 7c70a75..fdc6835 100644
--- a/python/qpid_dispatch_internal/dispatch.py
+++ b/python/qpid_dispatch_internal/dispatch.py
@@ -72,7 +72,7 @@ class QdDll(ctypes.PyDLL):
self._prototype(self.qd_dispatch_router_unlock, None,
[self.qd_dispatch_p])
self._prototype(self.qd_connection_manager_start, None,
[self.qd_dispatch_p])
- self._prototype(self.qd_waypoint_activate_all, None,
[self.qd_dispatch_p])
+ #self._prototype(self.qd_waypoint_activate_all, None,
[self.qd_dispatch_p])
self._prototype(self.qd_entity_refresh_begin, c_long, [py_object])
self._prototype(self.qd_entity_refresh_end, None, [])
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/77281d62/python/qpid_dispatch_internal/management/agent.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/management/agent.py
b/python/qpid_dispatch_internal/management/agent.py
index d1f9a0a..0750971 100644
--- a/python/qpid_dispatch_internal/management/agent.py
+++ b/python/qpid_dispatch_internal/management/agent.py
@@ -295,7 +295,7 @@ class FixedAddressEntity(EntityAdapter):
class WaypointEntity(EntityAdapter):
def create(self):
self._qd.qd_dispatch_configure_waypoint(self._dispatch, self)
- self._qd.qd_waypoint_activate_all(self._dispatch)
+ #self._qd.qd_waypoint_activate_all(self._dispatch)
class LinkRoutePatternEntity(EntityAdapter):
def create(self):
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/77281d62/python/qpid_dispatch_internal/router/engine.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/router/engine.py
b/python/qpid_dispatch_internal/router/engine.py
index bda27aa..b3fa1a1 100644
--- a/python/qpid_dispatch_internal/router/engine.py
+++ b/python/qpid_dispatch_internal/router/engine.py
@@ -33,6 +33,7 @@ import time
## (i.e. we are in a test bench, etc.), load the stub versions.
##
from ..dispatch import IoAdapter, LogAdapter, LOG_TRACE, LOG_INFO, LOG_ERROR,
LOG_STACK_LIMIT
+from ..dispatch import SEMANTICS_MULTICAST_FLOOD, SEMANTICS_MULTICAST_ONCE
class RouterEngine:
"""
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/77281d62/src/router_core/forwarder.c
----------------------------------------------------------------------
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index 1ab2e39..29d4c42 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -22,9 +22,12 @@
#include <stdio.h>
typedef void (*qdr_forward_message_t) (qdr_core_t *core,
- qdr_forwarder_t *forw,
+ qdr_address_t *addr,
qd_message_t *msg,
- qdr_delivery_t *in_delivery);
+ qdr_delivery_t *in_delivery,
+ bool exclude_inprocess,
+ bool control);
+
typedef void (*qdr_forward_attach_t) (qdr_core_t *core,
qdr_forwarder_t *forw,
qdr_link_t *link);
@@ -39,33 +42,40 @@ struct qdr_forwarder_t {
// Built-in Forwarders
//==================================================================================
-void qdr_forward_multicast(qdr_core_t *core,
- qdr_forwarder_t *forw,
- qd_message_t *msg,
- qdr_delivery_t *in_delivery)
+void qdr_forward_multicast_CT(qdr_core_t *core,
+ qdr_address_t *addr,
+ qd_message_t *msg,
+ qdr_delivery_t *in_delivery,
+ bool exclude_inprocess,
+ bool control)
{
+ //bool bypass_valid_origins = addr->forwarder->bypass_valid_origins;
}
-void qdr_forward_closest(qdr_core_t *core,
- qdr_forwarder_t *forw,
- qd_message_t *msg,
- qdr_delivery_t *in_delivery)
+void qdr_forward_closest_CT(qdr_core_t *core,
+ qdr_address_t *addr,
+ qd_message_t *msg,
+ qdr_delivery_t *in_delivery,
+ bool exclude_inprocess,
+ bool control)
{
}
-void qdr_forward_balanced(qdr_core_t *core,
- qdr_forwarder_t *forw,
- qd_message_t *msg,
- qdr_delivery_t *in_delivery)
+void qdr_forward_balanced_CT(qdr_core_t *core,
+ qdr_address_t *addr,
+ qd_message_t *msg,
+ qdr_delivery_t *in_delivery,
+ bool exclude_inprocess,
+ bool control)
{
}
-void qdr_forward_link_balanced(qdr_core_t *core,
- qdr_forwarder_t *forw,
- qdr_link_t *link)
+void qdr_forward_link_balanced_CT(qdr_core_t *core,
+ qdr_forwarder_t *forw,
+ qdr_link_t *link)
{
}
@@ -91,15 +101,15 @@ void qdr_forwarder_setup_CT(qdr_core_t *core)
//
// Create message forwarders
//
- core->forwarders[QD_SEMANTICS_MULTICAST_FLOOD] =
qdr_new_forwarder(qdr_forward_multicast, 0, true);
- core->forwarders[QD_SEMANTICS_MULTICAST_ONCE] =
qdr_new_forwarder(qdr_forward_multicast, 0, false);
- core->forwarders[QD_SEMANTICS_ANYCAST_CLOSEST] =
qdr_new_forwarder(qdr_forward_closest, 0, false);
- core->forwarders[QD_SEMANTICS_ANYCAST_BALANCED] =
qdr_new_forwarder(qdr_forward_balanced, 0, false);
+ core->forwarders[QD_SEMANTICS_MULTICAST_FLOOD] =
qdr_new_forwarder(qdr_forward_multicast_CT, 0, true);
+ core->forwarders[QD_SEMANTICS_MULTICAST_ONCE] =
qdr_new_forwarder(qdr_forward_multicast_CT, 0, false);
+ core->forwarders[QD_SEMANTICS_ANYCAST_CLOSEST] =
qdr_new_forwarder(qdr_forward_closest_CT, 0, false);
+ core->forwarders[QD_SEMANTICS_ANYCAST_BALANCED] =
qdr_new_forwarder(qdr_forward_balanced_CT, 0, false);
//
// Create link forwarders
//
- core->forwarders[QD_SEMANTICS_LINK_BALANCED] = qdr_new_forwarder(0,
qdr_forward_link_balanced, false);
+ core->forwarders[QD_SEMANTICS_LINK_BALANCED] = qdr_new_forwarder(0,
qdr_forward_link_balanced_CT, false);
}
@@ -111,8 +121,14 @@ qdr_forwarder_t *qdr_forwarder_CT(qdr_core_t *core,
qd_address_semantics_t seman
}
-void qdr_forward_message_CT(qdr_core_t *core, qdr_forwarder_t *forwarder,
qd_message_t *msg, qdr_delivery_t *in_delivery)
+void qdr_forward_message_CT(qdr_core_t *core, qdr_address_t *addr,
qd_message_t *msg, qdr_delivery_t *in_delivery,
+ bool exclude_inprocess, bool control)
{
+ if (addr->forwarder)
+ addr->forwarder->forward_message(core, addr, msg, in_delivery,
exclude_inprocess, control);
+ else {
+ // TODO - Deal with this delivery's disposition
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/77281d62/src/router_core/management_agent.c
----------------------------------------------------------------------
diff --git a/src/router_core/management_agent.c
b/src/router_core/management_agent.c
index 3aae26a..4ab7db1 100644
--- a/src/router_core/management_agent.c
+++ b/src/router_core/management_agent.c
@@ -148,7 +148,7 @@ static void qd_set_properties(qd_message_t *msg,
}
-static void qd_manage_response_handler (void *context, const qd_amqp_error_t
*status, bool more)
+static void qd_manage_response_handler(void *context, const qd_amqp_error_t
*status, bool more)
{
qd_management_context_t *ctx = (qd_management_context_t*) context;
@@ -313,13 +313,13 @@ static void qd_core_agent_delete_handler(qdr_core_t
*core,
/**
* Checks the content of the message to see if this can be handled by this
agent.
*/
-static bool qd_can_handle_request(qd_field_iterator_t *props,
- qd_router_entity_type_t *entity_type,
- qd_router_operation_type_t *operation_type,
+static bool qd_can_handle_request(qd_field_iterator_t *props,
+ qd_router_entity_type_t *entity_type,
+ qd_router_operation_type_t *operation_type,
qd_field_iterator_t **identity_iter,
qd_field_iterator_t **name_iter,
- int *count,
- int *offset)
+ int *count,
+ int *offset)
{
qd_parsed_field_t *fld = qd_parse(props);
@@ -407,7 +407,7 @@ static bool qd_can_handle_request(qd_field_iterator_t
*props,
* Handler for the management agent.
*
*/
-void management_agent_handler(void *context, qd_message_t *msg, int link_id)
+void management_agent_handler(void *context, qd_message_t *msg, int
unused_link_id)
{
qdr_core_t *core = (qdr_core_t*) context;
qd_field_iterator_t *app_properties_iter = qd_message_field_iterator(msg,
QD_FIELD_APPLICATION_PROPERTIES);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/77281d62/src/router_core/route_tables.c
----------------------------------------------------------------------
diff --git a/src/router_core/route_tables.c b/src/router_core/route_tables.c
index 57d9427..4216498 100644
--- a/src/router_core/route_tables.c
+++ b/src/router_core/route_tables.c
@@ -560,7 +560,7 @@ static void qdr_subscribe_CT(qdr_core_t *core, qdr_action_t
*action, bool discar
qdr_address_t *addr = 0;
char *astring = (char*) qd_field_iterator_copy(address->iterator);
- qd_log(core->log, QD_LOG_INFO, "Subscribed address=%s class=%c",
astring, aclass);
+ qd_log(core->log, QD_LOG_INFO, "In-process subscription %c/%s",
aclass, astring);
free(astring);
qd_address_iterator_override_prefix(address->iterator, aclass);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/77281d62/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h
b/src/router_core/router_core_private.h
index 217c333..bc94b68 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -25,10 +25,17 @@
#include <qpid/dispatch/log.h>
#include <memory.h>
-typedef struct qdr_forwarder_t qdr_forwarder_t;
+typedef struct qdr_address_t qdr_address_t;
+typedef struct qdr_node_t qdr_node_t;
+typedef struct qdr_router_ref_t qdr_router_ref_t;
+typedef struct qdr_link_ref_t qdr_link_ref_t;
+typedef struct qdr_lrp_t qdr_lrp_t;
+typedef struct qdr_lrp_ref_t qdr_lrp_ref_t;
+typedef struct qdr_forwarder_t qdr_forwarder_t;
qdr_forwarder_t *qdr_forwarder_CT(qdr_core_t *core, qd_address_semantics_t
semantics);
-void qdr_forward_message_CT(qdr_core_t *core, qdr_forwarder_t *forwarder,
qd_message_t *msg, qdr_delivery_t *in_delivery);
+void qdr_forward_message_CT(qdr_core_t *core, qdr_address_t *addr,
qd_message_t *msg, qdr_delivery_t *in_delivery,
+ bool exclude_inprocess, bool control);
void qdr_forward_attach_CT(qdr_core_t *core, qdr_forwarder_t *forwarder,
qdr_link_t *in_link);
/**
@@ -132,13 +139,6 @@ ALLOC_DECLARE(qdr_query_t);
DEQ_DECLARE(qdr_query_t, qdr_query_list_t);
-typedef struct qdr_address_t qdr_address_t;
-typedef struct qdr_node_t qdr_node_t;
-typedef struct qdr_router_ref_t qdr_router_ref_t;
-typedef struct qdr_link_ref_t qdr_link_ref_t;
-typedef struct qdr_lrp_t qdr_lrp_t;
-typedef struct qdr_lrp_ref_t qdr_lrp_ref_t;
-
struct qdr_node_t {
DEQ_LINKS(qdr_node_t);
qdr_address_t *owning_addr;
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/77281d62/src/router_core/router_core_thread.c
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_thread.c
b/src/router_core/router_core_thread.c
index 28e979f..b359602 100644
--- a/src/router_core/router_core_thread.c
+++ b/src/router_core/router_core_thread.c
@@ -36,9 +36,9 @@ void *router_core_thread(void *arg)
qdr_action_list_t action_list;
qdr_action_t *action;
+ qdr_forwarder_setup_CT(core);
qdr_route_table_setup_CT(core);
qdr_agent_setup_CT(core);
- qdr_forwarder_setup_CT(core);
qd_log(core->log, QD_LOG_INFO, "Router Core thread running. %s/%s",
core->router_area, core->router_id);
while (core->running) {
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/77281d62/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 0a2ae66..48d3446 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -105,18 +105,6 @@ void *qdr_delivery_get_context(qdr_delivery_t *delivery)
// In-Thread Functions
//==================================================================================
-static void qdr_route_message_CT(qdr_core_t *core,
- qdr_address_t *addr,
- qd_message_t *msg,
- qdr_delivery_t *dlv,
- bool exclude_inprocess,
- bool control)
-{
- const char *key = (const char*) qd_hash_key_by_handle(addr->hash_handle);
- printf("qdr_route_message_CT - %s, %s\n", key, exclude_inprocess ? "yes" :
"no");
-}
-
-
static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool
discard)
{
if (discard)
@@ -141,7 +129,7 @@ static void qdr_send_to_CT(qdr_core_t *core, qdr_action_t
*action, bool discard)
qd_address_iterator_reset_view(addr_field->iterator,
ITER_VIEW_ADDRESS_HASH);
qd_hash_retrieve(core->addr_hash, addr_field->iterator, (void**)
&addr);
if (addr)
- qdr_route_message_CT(core, addr, msg, 0,
action->args.io.exclude_inprocess, action->args.io.control);
+ qdr_forward_message_CT(core, addr, msg, 0,
action->args.io.exclude_inprocess, action->args.io.control);
else
qd_log(core->log, QD_LOG_DEBUG, "In-process send to an unknown
address");
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]