Repository: qpid-dispatch Updated Branches: refs/heads/tross-DISPATCH-179-1 40b444015 -> fe3157c51
DISPATCH-179 - Added forwarder infrastructure to the router core. Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/fe3157c5 Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/fe3157c5 Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/fe3157c5 Branch: refs/heads/tross-DISPATCH-179-1 Commit: fe3157c5104ee6e1b0400dd71ba28e115228dc59 Parents: 40b4440 Author: Ted Ross <[email protected]> Authored: Mon Jan 4 11:11:34 2016 -0500 Committer: Ted Ross <[email protected]> Committed: Mon Jan 4 11:11:34 2016 -0500 ---------------------------------------------------------------------- include/qpid/dispatch/router.h | 70 ++--------- python/qpid_dispatch_internal/router/engine.py | 10 +- src/CMakeLists.txt | 2 +- src/python_embedded.c | 16 ++- src/router_config.c | 41 ++++--- src/router_core/connections.c | 9 +- src/router_core/forwarder.c | 122 ++++++++++++++++++++ src/router_core/route_tables.c | 18 ++- src/router_core/router_core.c | 7 +- src/router_core/router_core_private.h | 14 ++- src/router_core/router_core_thread.c | 1 + src/router_node.c | 2 +- 12 files changed, 193 insertions(+), 119 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fe3157c5/include/qpid/dispatch/router.h ---------------------------------------------------------------------- diff --git a/include/qpid/dispatch/router.h b/include/qpid/dispatch/router.h index 30b9022..499e808 100644 --- a/include/qpid/dispatch/router.h +++ b/include/qpid/dispatch/router.h @@ -34,71 +34,17 @@ typedef struct qd_router_t qd_router_t; typedef struct qd_address_t qd_address_t; -typedef uint8_t qd_address_semantics_t; typedef struct qd_router_delivery_t qd_router_delivery_t; -#include <qpid/dispatch/router_core.h> - -/** - * @name Address fanout semantics - * @{ - */ -#define QD_FANOUTMASK 0x03 -#define QD_FANOUT_SINGLE 0x00 ///< Message will be delivered to a single consumer. -#define QD_FANOUT_MULTIPLE 0x01 ///< Message will be delivered to multiple consumers. -#define QD_FANOUT_GROUP 0x02 ///< Message will be delivered to one consumer per group. -#define QD_FANOUT(d) (d & QD_FANOUTMASK) ///< Get fanout bits. -///@} - -/** - * @name Address bias semantics for SINGLE/GROUP fanout - * @{ - */ - -#define QD_BIASMASK 0x0c -#define QD_BIAS_NONE 0x00 ///< Apply no bias (also used for multiple fanout). -#define QD_BIAS_CLOSEST 0x04 ///< Message will be delivered to the closest (lowest cost) consumer. -#define QD_BIAS_SPREAD 0x08 ///< Messages will be spread arbitrarily across all consumers. -#define QD_BIAS_LATENCY 0x0c ///< Messages will be spread to minimize latency in light of each consumer's rate of consumption. -#define QD_BIAS(d) (d & QD_BIASMASK) -///@} - +typedef enum { + QD_SEMANTICS_MULTICAST_FLOOD = 0, + QD_SEMANTICS_MULTICAST_ONCE = 1, + QD_SEMANTICS_ANYCAST_CLOSEST = 2, + QD_SEMANTICS_ANYCAST_BALANCED = 3, + QD_SEMANTICS_LINK_BALANCED = 4 +} qd_address_semantics_t; -/** - * @name Address congestion semantics. - * - * This controls that the router will do with - * received messages that are destined for congested destinations. - * @{ - */ -#define QD_CONGESTIONMASK 0x30 -/** Drop/Release the message.*/ -#define QD_CONGESTION_DROP 0x00 - /** - * Stop issuing replacement credits to slow the producer. This puts a cap on - * the total number of messages addressed to this address from a particular - * producer that can be buffered in the router. - */ -#define QD_CONGESTION_BACKPRESSURE 0x10 - /** Redirect messages to an alternate address. */ -#define QD_CONGESTION_REDIRECT 0x20 -#define QD_CONGESTION(d) (d & QD_CONGESTIONMASK) -/// @} - -/** @name Other semantics - * @{ - */ -#define QD_DROP_FOR_SLOW_CONSUMERS 0x40 -#define QD_BYPASS_VALID_ORIGINS 0x80 -///@} - -/** - * @name Sematics groups - * @{ - */ -#define QD_SEMANTICS_ROUTER_CONTROL (QD_FANOUT_MULTIPLE | QD_BIAS_NONE | QD_CONGESTION_DROP | QD_DROP_FOR_SLOW_CONSUMERS | QD_BYPASS_VALID_ORIGINS) -#define QD_SEMANTICS_DEFAULT (QD_FANOUT_MULTIPLE | QD_BIAS_NONE | QD_CONGESTION_DROP | QD_DROP_FOR_SLOW_CONSUMERS) -///@} +#include <qpid/dispatch/router_core.h> /** Message forwarding descriptor * http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fe3157c5/python/qpid_dispatch_internal/router/engine.py ---------------------------------------------------------------------- diff --git a/python/qpid_dispatch_internal/router/engine.py b/python/qpid_dispatch_internal/router/engine.py index 140e5c0..bda27aa 100644 --- a/python/qpid_dispatch_internal/router/engine.py +++ b/python/qpid_dispatch_internal/router/engine.py @@ -52,11 +52,11 @@ class RouterEngine: self._log_ls = LogAdapter("ROUTER_LS") self._log_ma = LogAdapter("ROUTER_MA") self._log_general = LogAdapter("ROUTER") - self.io_adapter = [IoAdapter(self.receive, "qdrouter"), - IoAdapter(self.receive, "qdrouter.ma"), - IoAdapter(self.receive, "qdrouter", 'T'), - IoAdapter(self.receive, "qdrouter.ma", 'T'), - IoAdapter(self.receive, "qdhello")] + self.io_adapter = [IoAdapter(self.receive, "qdrouter", 'L', '0', SEMANTICS_MULTICAST_FLOOD), + IoAdapter(self.receive, "qdrouter.ma", 'L', '0', SEMANTICS_MULTICAST_ONCE), + IoAdapter(self.receive, "qdrouter", 'T', '0', SEMANTICS_MULTICAST_FLOOD), + IoAdapter(self.receive, "qdrouter.ma", 'T', '0', SEMANTICS_MULTICAST_ONCE), + IoAdapter(self.receive, "qdhello", 'L', '0', SEMANTICS_MULTICAST_FLOOD)] self.max_routers = max_routers self.id = router_id self.instance = long(time.time()) http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fe3157c5/src/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 73b3144..e82d805 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -71,6 +71,7 @@ set(qpid_dispatch_SOURCES router_core/agent_link.c router_core/connections.c router_core/error.c + router_core/forwarder.c router_core/router_core.c router_core/router_core_thread.c router_core/route_tables.c @@ -79,7 +80,6 @@ set(qpid_dispatch_SOURCES router_core/transfer.c router_delivery.c router_node.c - router_forwarders.c router_pynode.c schema_enum.c server.c http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fe3157c5/src/python_embedded.c ---------------------------------------------------------------------- diff --git a/src/python_embedded.c b/src/python_embedded.c index 1eb4779..f8afe25 100644 --- a/src/python_embedded.c +++ b/src/python_embedded.c @@ -42,8 +42,6 @@ static PyObject *dispatch_module = 0; static PyObject *message_type = 0; static PyObject *dispatch_python_pkgdir = 0; -static qd_address_semantics_t py_semantics = QD_FANOUT_MULTIPLE | QD_BIAS_NONE | QD_CONGESTION_DROP | QD_DROP_FOR_SLOW_CONSUMERS; - static void qd_python_setup(void); @@ -527,9 +525,10 @@ static void qd_io_rx_handler(void *context, qd_message_t *msg, int link_id) static int IoAdapter_init(IoAdapter *self, PyObject *args, PyObject *kwds) { PyObject *addr; - char aclass = 'L'; - char phase = '0'; - if (!PyArg_ParseTuple(args, "OO|cc", &self->handler, &addr, &aclass, &phase)) + char aclass = 'L'; + char phase = '0'; + int semantics = QD_SEMANTICS_ANYCAST_BALANCED; + if (!PyArg_ParseTuple(args, "OO|cci", &self->handler, &addr, &aclass, &phase, &semantics)) return -1; if (!PyCallable_Check(self->handler)) { PyErr_SetString(PyExc_TypeError, "IoAdapter.__init__ handler is not callable"); @@ -541,7 +540,7 @@ static int IoAdapter_init(IoAdapter *self, PyObject *args, PyObject *kwds) const char *address = PyString_AsString(addr); if (!address) return -1; qd_error_clear(); - self->sub = qdr_core_subscribe(self->core, address, aclass, phase, py_semantics, qd_io_rx_handler, self); + self->sub = qdr_core_subscribe(self->core, address, aclass, phase, semantics, qd_io_rx_handler, self); if (qd_error_code()) { PyErr_SetString(PyExc_RuntimeError, qd_error_message()); return -1; @@ -734,6 +733,11 @@ static void qd_python_setup(void) Py_INCREF(ioaType); PyModule_AddObject(m, "IoAdapter", (PyObject*) &IoAdapterType); + qd_register_constant(m, "SEMANTICS_MULTICAST_FLOOD", QD_SEMANTICS_MULTICAST_FLOOD); + qd_register_constant(m, "SEMANTICS_MULTICAST_ONCE", QD_SEMANTICS_MULTICAST_ONCE); + qd_register_constant(m, "SEMANTICS_ANYCAST_CLOSEST", QD_SEMANTICS_ANYCAST_CLOSEST); + qd_register_constant(m, "SEMANTICS_ANYCAST_BALANCED", QD_SEMANTICS_ANYCAST_BALANCED); + Py_INCREF(m); dispatch_module = m; } http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fe3157c5/src/router_config.c ---------------------------------------------------------------------- diff --git a/src/router_config.c b/src/router_config.c index fc3a8c0..315a468 100644 --- a/src/router_config.c +++ b/src/router_config.c @@ -76,32 +76,31 @@ qd_error_t qd_router_configure_address(qd_router_t *router, qd_entity_t *entity) qd_address_semantics_t semantics = 0; switch(fanout) { - case QD_SCHEMA_FIXEDADDRESS_FANOUT_MULTIPLE: semantics |= QD_FANOUT_MULTIPLE; break; - case QD_SCHEMA_FIXEDADDRESS_FANOUT_SINGLE: semantics |= QD_FANOUT_SINGLE; break; - default: - free(prefix); - free(addr_phase); - return qd_error(QD_ERROR_CONFIG, "Invalid fanout value %d", fanout); - } + case QD_SCHEMA_FIXEDADDRESS_FANOUT_MULTIPLE: + semantics = QD_SEMANTICS_MULTICAST_ONCE; + break; - if ((semantics & QD_FANOUTMASK) == QD_FANOUT_SINGLE) { + case QD_SCHEMA_FIXEDADDRESS_FANOUT_SINGLE: switch(bias) { - case QD_SCHEMA_FIXEDADDRESS_BIAS_CLOSEST: semantics |= QD_BIAS_CLOSEST; break; - case QD_SCHEMA_FIXEDADDRESS_BIAS_SPREAD: semantics |= QD_BIAS_SPREAD; break; - default: + case QD_SCHEMA_FIXEDADDRESS_BIAS_CLOSEST: + semantics = QD_SEMANTICS_ANYCAST_CLOSEST; + break; + + case QD_SCHEMA_FIXEDADDRESS_BIAS_SPREAD: + semantics = QD_SEMANTICS_ANYCAST_BALANCED; + break; + + default: free(prefix); free(addr_phase); return qd_error(QD_ERROR_CONFIG, "Invalid bias value %d", fanout); } - qd_log(router->log_source, QD_LOG_INFO, - "Configured Address: prefix=%s phase=%d fanout=%s bias=%s", - prefix, phase, - qd_schema_fixedAddress_fanout_names[fanout], - qd_schema_fixedAddress_bias_names[bias]); - } else { - semantics |= QD_BIAS_NONE; - qd_log(router->log_source, QD_LOG_INFO, "Configured Address: prefix=%s phase=%d fanout=%s", - prefix, phase, qd_schema_fixedAddress_fanout_names[fanout]); + break; + + default: + free(prefix); + free(addr_phase); + return qd_error(QD_ERROR_CONFIG, "Invalid fanout value %d", fanout); } addr_phase->semantics = semantics; @@ -327,5 +326,5 @@ qd_address_semantics_t router_semantics_for_addr(qd_router_t *router, qd_field_i } qd_address_iterator_reset_view(iter, old_view); - return phase ? phase->semantics : QD_SEMANTICS_DEFAULT; + return phase ? phase->semantics : QD_SEMANTICS_ANYCAST_BALANCED; } http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fe3157c5/src/router_core/connections.c ---------------------------------------------------------------------- diff --git a/src/router_core/connections.c b/src/router_core/connections.c index bb6aad9..124f4d6 100644 --- a/src/router_core/connections.c +++ b/src/router_core/connections.c @@ -30,9 +30,6 @@ static void qdr_link_inbound_detach_CT(qdr_core_t *core, qdr_action_t *action, b ALLOC_DEFINE(qdr_connection_t); ALLOC_DEFINE(qdr_connection_work_t); -static qd_address_semantics_t qdr_dynamic_semantics = QD_FANOUT_SINGLE | QD_BIAS_CLOSEST | QD_CONGESTION_BACKPRESSURE; -static qd_address_semantics_t qdr_default_semantics = QD_FANOUT_SINGLE | QD_BIAS_SPREAD | QD_CONGESTION_BACKPRESSURE; - typedef enum { QDR_CONDITION_NO_ROUTE_TO_DESTINATION, QDR_CONDITION_ROUTED_LINK_LOST, @@ -378,7 +375,7 @@ static qd_address_semantics_t qdr_semantics_for_address(qdr_core_t *core, qd_fie // static routes; yes: prevents occlusion by mobile addresses with specified semantics) // qd_hash_retrieve_prefix(core->addr_hash, iter, (void**) &addr); - return addr ? addr->semantics : qdr_default_semantics; + return /* addr ? addr->semantics : */ QD_SEMANTICS_ANYCAST_BALANCED; // FIXME } @@ -480,7 +477,7 @@ static qdr_address_t *qdr_lookup_terminus_address_CT(qdr_core_t *core, qd_field_iterator_t *temp_iter = qd_address_iterator_string(temp_addr, ITER_VIEW_ADDRESS_HASH); qd_hash_retrieve(core->addr_hash, temp_iter, (void**) &addr); if (!addr) { - addr = qdr_address(qdr_dynamic_semantics); + addr = qdr_address_CT(core, QD_SEMANTICS_ANYCAST_CLOSEST); qd_hash_insert(core->addr_hash, temp_iter, addr, &addr->hash_handle); DEQ_INSERT_TAIL(core->addrs, addr); qdr_terminus_set_address(terminus, temp_addr); @@ -517,7 +514,7 @@ static qdr_address_t *qdr_lookup_terminus_address_CT(qdr_core_t *core, qd_hash_retrieve(core->addr_hash, iter, (void**) &addr); if (!addr && create_if_not_found) { qd_address_semantics_t sem = qdr_semantics_for_address(core, iter); - addr = qdr_address(sem); + addr = qdr_address_CT(core, sem); qd_hash_insert(core->addr_hash, iter, addr, &addr->hash_handle); DEQ_INSERT_TAIL(core->addrs, addr); } http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fe3157c5/src/router_core/forwarder.c ---------------------------------------------------------------------- diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c new file mode 100644 index 0000000..1ab2e39 --- /dev/null +++ b/src/router_core/forwarder.c @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "router_core_private.h" +#include <qpid/dispatch/amqp.h> +#include <stdio.h> + +typedef void (*qdr_forward_message_t) (qdr_core_t *core, + qdr_forwarder_t *forw, + qd_message_t *msg, + qdr_delivery_t *in_delivery); +typedef void (*qdr_forward_attach_t) (qdr_core_t *core, + qdr_forwarder_t *forw, + qdr_link_t *link); + +struct qdr_forwarder_t { + qdr_forward_message_t forward_message; + qdr_forward_attach_t forward_attach; + bool bypass_valid_origins; +}; + +//================================================================================== +// Built-in Forwarders +//================================================================================== + +void qdr_forward_multicast(qdr_core_t *core, + qdr_forwarder_t *forw, + qd_message_t *msg, + qdr_delivery_t *in_delivery) +{ +} + + +void qdr_forward_closest(qdr_core_t *core, + qdr_forwarder_t *forw, + qd_message_t *msg, + qdr_delivery_t *in_delivery) +{ +} + + +void qdr_forward_balanced(qdr_core_t *core, + qdr_forwarder_t *forw, + qd_message_t *msg, + qdr_delivery_t *in_delivery) +{ +} + + +void qdr_forward_link_balanced(qdr_core_t *core, + qdr_forwarder_t *forw, + qdr_link_t *link) +{ +} + + +//================================================================================== +// In-Thread API Functions +//================================================================================== + +qdr_forwarder_t *qdr_new_forwarder(qdr_forward_message_t fm, qdr_forward_attach_t fa, bool bypass_valid_origins) +{ + qdr_forwarder_t *forw = NEW(qdr_forwarder_t); + + forw->forward_message = fm; + forw->forward_attach = fa; + forw->bypass_valid_origins = bypass_valid_origins; + + return forw; +} + + +void qdr_forwarder_setup_CT(qdr_core_t *core) +{ + // + // Create message forwarders + // + core->forwarders[QD_SEMANTICS_MULTICAST_FLOOD] = qdr_new_forwarder(qdr_forward_multicast, 0, true); + core->forwarders[QD_SEMANTICS_MULTICAST_ONCE] = qdr_new_forwarder(qdr_forward_multicast, 0, false); + core->forwarders[QD_SEMANTICS_ANYCAST_CLOSEST] = qdr_new_forwarder(qdr_forward_closest, 0, false); + core->forwarders[QD_SEMANTICS_ANYCAST_BALANCED] = qdr_new_forwarder(qdr_forward_balanced, 0, false); + + // + // Create link forwarders + // + core->forwarders[QD_SEMANTICS_LINK_BALANCED] = qdr_new_forwarder(0, qdr_forward_link_balanced, false); +} + + +qdr_forwarder_t *qdr_forwarder_CT(qdr_core_t *core, qd_address_semantics_t semantics) +{ + if (semantics <= QD_SEMANTICS_LINK_BALANCED) + return core->forwarders[semantics]; + return 0; +} + + +void qdr_forward_message_CT(qdr_core_t *core, qdr_forwarder_t *forwarder, qd_message_t *msg, qdr_delivery_t *in_delivery) +{ +} + + +void qdr_forward_attach_CT(qdr_core_t *core, qdr_forwarder_t *forwarder, qdr_link_t *in_link) +{ +} + http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fe3157c5/src/router_core/route_tables.c ---------------------------------------------------------------------- diff --git a/src/router_core/route_tables.c b/src/router_core/route_tables.c index ad0a035..2790b39 100644 --- a/src/router_core/route_tables.c +++ b/src/router_core/route_tables.c @@ -32,8 +32,6 @@ static void qdr_unmap_destination_CT (qdr_core_t *core, qdr_action_t *action, bo static void qdr_subscribe_CT (qdr_core_t *core, qdr_action_t *action, bool discard); static void qdr_unsubscribe_CT (qdr_core_t *core, qdr_action_t *action, bool discard); -static qd_address_semantics_t router_addr_semantics = QD_FANOUT_SINGLE | QD_BIAS_CLOSEST | QD_CONGESTION_DROP | QD_DROP_FOR_SLOW_CONSUMERS | QD_BYPASS_VALID_ORIGINS; - //================================================================================== // Interface Functions @@ -175,11 +173,11 @@ void qdr_route_table_setup_CT(qdr_core_t *core) DEQ_INIT(core->routers); core->addr_hash = qd_hash(10, 32, 0); - core->hello_addr = qdr_add_local_address_CT(core, 'L', "qdhello", QD_SEMANTICS_ROUTER_CONTROL); - core->router_addr_L = qdr_add_local_address_CT(core, 'L', "qdrouter", QD_SEMANTICS_ROUTER_CONTROL); - core->routerma_addr_L = qdr_add_local_address_CT(core, 'L', "qdrouter.ma", QD_SEMANTICS_DEFAULT); - core->router_addr_T = qdr_add_local_address_CT(core, 'T', "qdrouter", QD_SEMANTICS_ROUTER_CONTROL); - core->routerma_addr_T = qdr_add_local_address_CT(core, 'T', "qdrouter.ma", QD_SEMANTICS_DEFAULT); + core->hello_addr = qdr_add_local_address_CT(core, 'L', "qdhello", QD_SEMANTICS_MULTICAST_FLOOD); + core->router_addr_L = qdr_add_local_address_CT(core, 'L', "qdrouter", QD_SEMANTICS_MULTICAST_FLOOD); + core->routerma_addr_L = qdr_add_local_address_CT(core, 'L', "qdrouter.ma", QD_SEMANTICS_MULTICAST_ONCE); + core->router_addr_T = qdr_add_local_address_CT(core, 'T', "qdrouter", QD_SEMANTICS_MULTICAST_FLOOD); + core->routerma_addr_T = qdr_add_local_address_CT(core, 'T', "qdrouter.ma", QD_SEMANTICS_MULTICAST_ONCE); core->neighbor_free_mask = qd_bitmask(1); @@ -235,7 +233,7 @@ static void qdr_add_router_CT(qdr_core_t *core, qdr_action_t *action, bool disca // This record will be found whenever a "foreign" topological address to this // remote router is looked up. // - addr = qdr_address(router_addr_semantics); + addr = qdr_address_CT(core, QD_SEMANTICS_ANYCAST_CLOSEST); qd_hash_insert(core->addr_hash, iter, addr, &addr->hash_handle); DEQ_INSERT_TAIL(core->addrs, addr); @@ -488,7 +486,7 @@ static void qdr_map_destination_CT(qdr_core_t *core, qdr_action_t *action, bool qd_hash_retrieve(core->addr_hash, iter, (void**) &addr); if (!addr) { - addr = qdr_address(0); // FIXME - Semantics + addr = qdr_address_CT(core, 0); // FIXME - Semantics qd_hash_insert(core->addr_hash, iter, addr, &addr->hash_handle); DEQ_ITEM_INIT(addr); DEQ_INSERT_TAIL(core->addrs, addr); @@ -568,7 +566,7 @@ static void qdr_subscribe_CT(qdr_core_t *core, qdr_action_t *action, bool discar qd_hash_retrieve(core->addr_hash, address->iterator, (void**) &addr); if (!addr) { - addr = qdr_address(action->args.io.semantics); + addr = qdr_address_CT(core, action->args.io.semantics); qd_hash_insert(core->addr_hash, address->iterator, addr, &addr->hash_handle); DEQ_ITEM_INIT(addr); DEQ_INSERT_TAIL(core->addrs, addr); http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fe3157c5/src/router_core/router_core.c ---------------------------------------------------------------------- diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c index a61e72c..0625784 100644 --- a/src/router_core/router_core.c +++ b/src/router_core/router_core.c @@ -156,12 +156,11 @@ void qdr_action_enqueue(qdr_core_t *core, qdr_action_t *action) } -qdr_address_t *qdr_address(qd_address_semantics_t semantics) +qdr_address_t *qdr_address_CT(qdr_core_t *core, qd_address_semantics_t semantics) { qdr_address_t *addr = new_qdr_address_t(); ZERO(addr); - addr->semantics = semantics; - addr->forwarder = qd_router_get_forwarder(semantics); + addr->forwarder = qdr_forwarder_CT(core, semantics); return addr; } @@ -177,7 +176,7 @@ qdr_address_t *qdr_add_local_address_CT(qdr_core_t *core, char aclass, const cha qd_hash_retrieve(core->addr_hash, iter, (void**) &addr); if (!addr) { - addr = qdr_address(semantics); + addr = qdr_address_CT(core, semantics); qd_hash_insert(core->addr_hash, iter, addr, &addr->hash_handle); DEQ_ITEM_INIT(addr); DEQ_INSERT_TAIL(core->addrs, addr); http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fe3157c5/src/router_core/router_core_private.h ---------------------------------------------------------------------- diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h index c135f11..0fec699 100644 --- a/src/router_core/router_core_private.h +++ b/src/router_core/router_core_private.h @@ -25,6 +25,12 @@ #include <qpid/dispatch/log.h> #include <memory.h> +typedef struct qdr_forwarder_t qdr_forwarder_t; + +qdr_forwarder_t *qdr_forwarder_CT(qdr_core_t *core, qd_address_semantics_t semantics); +void qdr_forward_message_CT(qdr_core_t *core, qdr_forwarder_t *forwarder, qd_message_t *msg, qdr_delivery_t *in_delivery); +void qdr_forward_attach_CT(qdr_core_t *core, qdr_forwarder_t *forwarder, qdr_link_t *in_link); + /** * qdr_field_t - This type is used to pass variable-length fields (strings, etc.) into * and out of the router-core thread. @@ -233,11 +239,10 @@ struct qdr_address_t { qdr_link_ref_list_t inlinks; ///< Locally-Connected Producers qdr_router_ref_list_t rnodes; ///< Remotely-Connected Consumers qd_hash_handle_t *hash_handle; ///< Linkage back to the hash table entry - qd_address_semantics_t semantics; + qdr_forwarder_t *forwarder; bool toggle; bool waypoint; bool block_deletion; - qd_router_forwarder_t *forwarder; /**@name Statistics */ ///@{ @@ -252,7 +257,7 @@ struct qdr_address_t { ALLOC_DECLARE(qdr_address_t); DEQ_DECLARE(qdr_address_t, qdr_address_list_t); -qdr_address_t *qdr_address(qd_address_semantics_t semantics); +qdr_address_t *qdr_address_CT(qdr_core_t *core, qd_address_semantics_t semantics); qdr_address_t *qdr_add_local_address_CT(qdr_core_t *core, char aclass, const char *addr, qd_address_semantics_t semantics); void qdr_add_link_ref(qdr_link_ref_list_t *ref_list, qdr_link_t *link); @@ -385,6 +390,8 @@ struct qdr_core_t { qdr_node_t **routers_by_mask_bit; qdr_link_t **control_links_by_mask_bit; qdr_link_t **data_links_by_mask_bit; + + qdr_forwarder_t *forwarders[QD_SEMANTICS_LINK_BALANCED + 1]; }; typedef enum { @@ -396,6 +403,7 @@ typedef enum { void *router_core_thread(void *arg); void qdr_route_table_setup_CT(qdr_core_t *core); void qdr_agent_setup_CT(qdr_core_t *core); +void qdr_forwarder_setup_CT(qdr_core_t *core); qdr_action_t *qdr_action(qdr_action_handler_t action_handler, const char *label); void qdr_action_enqueue(qdr_core_t *core, qdr_action_t *action); void qdr_agent_enqueue_response_CT(qdr_core_t *core, qdr_query_t *query); http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fe3157c5/src/router_core/router_core_thread.c ---------------------------------------------------------------------- diff --git a/src/router_core/router_core_thread.c b/src/router_core/router_core_thread.c index ce4b6a8..28e979f 100644 --- a/src/router_core/router_core_thread.c +++ b/src/router_core/router_core_thread.c @@ -38,6 +38,7 @@ void *router_core_thread(void *arg) qdr_route_table_setup_CT(core); qdr_agent_setup_CT(core); + qdr_forwarder_setup_CT(core); qd_log(core->log, QD_LOG_INFO, "Router Core thread running. %s/%s", core->router_area, core->router_id); while (core->running) { http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fe3157c5/src/router_node.c ---------------------------------------------------------------------- diff --git a/src/router_node.c b/src/router_node.c index 33f4eba..cde02ef 100644 --- a/src/router_node.c +++ b/src/router_node.c @@ -59,7 +59,7 @@ qd_address_t* qd_address(qd_address_semantics_t semantics) DEQ_INIT(addr->rlinks); DEQ_INIT(addr->rnodes); addr->semantics = semantics; - addr->forwarder = qd_router_get_forwarder(semantics); + addr->forwarder = 0; //qd_router_get_forwarder(semantics); return addr; } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
