Repository: qpid-dispatch Updated Branches: refs/heads/tross-DISPATCH-179-1 f03cca6bb -> 382fb1b03
DISPATCH-179 - Ported in the first handler: add_router Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/382fb1b0 Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/382fb1b0 Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/382fb1b0 Branch: refs/heads/tross-DISPATCH-179-1 Commit: 382fb1b030115bc173edd81119ececd76c215c3e Parents: f03cca6 Author: Ted Ross <[email protected]> Authored: Wed Oct 21 11:57:27 2015 -0400 Committer: Ted Ross <[email protected]> Committed: Wed Oct 21 11:57:27 2015 -0400 ---------------------------------------------------------------------- src/router_core/route_tables.c | 82 +++++++++++++++++- src/router_core/route_tables.h | 22 ----- src/router_core/router_core.c | 81 +++++++++++++++-- src/router_core/router_core_private.h | 134 +++++++++++++++++++++++++++++ 4 files changed, 290 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/382fb1b0/src/router_core/route_tables.c ---------------------------------------------------------------------- diff --git a/src/router_core/route_tables.c b/src/router_core/route_tables.c index bd102f0..d039ff1 100644 --- a/src/router_core/route_tables.c +++ b/src/router_core/route_tables.c @@ -18,7 +18,6 @@ */ #include "router_core_private.h" -//#include "route_tables.h" static qdr_action_t *qdr_action(qdr_action_handler_t action_handler); static void qdr_action_enqueue(qdr_core_t *core, qdr_action_t *action); @@ -33,6 +32,8 @@ static void qdrh_set_valid_origins(qdr_core_t *core, qdr_action_t *action); static void qdrh_map_destination(qdr_core_t *core, qdr_action_t *action); static void qdrh_unmap_destination(qdr_core_t *core, qdr_action_t *action); +static qd_address_semantics_t router_addr_semantics = QD_FANOUT_SINGLE | QD_BIAS_CLOSEST | QD_CONGESTION_DROP | QD_DROP_FOR_SLOW_CONSUMERS | QD_BYPASS_VALID_ORIGINS; + //================================================================================== // Interface Functions @@ -156,8 +157,87 @@ static void qdr_action_enqueue(qdr_core_t *core, qdr_action_t *action) // In-Thread Functions //================================================================================== +void qdr_route_table_setup(qdr_core_t *core) +{ + DEQ_INIT(core->addrs); + //DEQ_INIT(core->links); + DEQ_INIT(core->routers); + core->addr_hash = qd_hash(10, 32, 0); + + core->router_addr = qdr_add_local_address(core, "qdrouter", QD_SEMANTICS_ROUTER_CONTROL); + core->routerma_addr = qdr_add_local_address(core, "qdrouter.ma", QD_SEMANTICS_DEFAULT); + core->hello_addr = qdr_add_local_address(core, "qdhello", QD_SEMANTICS_ROUTER_CONTROL); + + core->routers_by_mask_bit = NEW_PTR_ARRAY(qdr_node_t, qd_bitmask_width()); + for (int idx = 0; idx < qd_bitmask_width(); idx++) + core->routers_by_mask_bit[idx] = 0; +} + + static void qdrh_add_router(qdr_core_t *core, qdr_action_t *action) { + int router_maskbit = action->args.route_table.router_maskbit; + + if (router_maskbit >= qd_bitmask_width() || router_maskbit < 0) { + qd_log(core->log, QD_LOG_CRITICAL, "add_router: Router maskbit out of range: %d", router_maskbit); + return; + } + + if (core->routers_by_mask_bit[router_maskbit] != 0) { + qd_log(core->log, QD_LOG_CRITICAL, "add_router: Router maskbit already in use: %d", router_maskbit); + return; + } + + // + // Hash lookup the address to ensure there isn't an existing router address. + // + qd_field_iterator_t *iter = action->args.route_table.address->iterator; + qdr_address_t *addr; + + qd_address_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH); + qd_hash_retrieve(core->addr_hash, iter, (void**) &addr); + assert(addr == 0); + + // + // Create an address record for this router and insert it in the hash table. + // This record will be found whenever a "foreign" topological address to this + // remote router is looked up. + // + addr = qdr_address(router_addr_semantics); + qd_hash_insert(core->addr_hash, iter, addr, &addr->hash_handle); + DEQ_INSERT_TAIL(core->addrs, addr); + + // + // Create a router-node record to represent the remote router. + // + qdr_node_t *rnode = new_qdr_node_t(); + DEQ_ITEM_INIT(rnode); + rnode->owning_addr = addr; + rnode->mask_bit = router_maskbit; + rnode->next_hop = 0; + rnode->peer_link = 0; + rnode->ref_count = 0; + rnode->valid_origins = qd_bitmask(0); + + DEQ_INSERT_TAIL(core->routers, rnode); + + // + // Link the router record to the address record. + // + qdr_add_node_ref(&addr->rnodes, rnode); + + // + // Link the router record to the router address records. + // + qdr_add_node_ref(&core->router_addr->rnodes, rnode); + qdr_add_node_ref(&core->routerma_addr->rnodes, rnode); + + // + // Add the router record to the mask-bit index. + // + core->routers_by_mask_bit[router_maskbit] = rnode; + + qdr_field_free(action->args.route_table.address); } http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/382fb1b0/src/router_core/route_tables.h ---------------------------------------------------------------------- diff --git a/src/router_core/route_tables.h b/src/router_core/route_tables.h deleted file mode 100644 index 13980c4..0000000 --- a/src/router_core/route_tables.h +++ /dev/null @@ -1,22 +0,0 @@ -#ifndef __core_route_tables_h__ -#define __core_route_tables_h__ 1 -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#endif http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/382fb1b0/src/router_core/router_core.c ---------------------------------------------------------------------- diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c index 7293caf..d713583 100644 --- a/src/router_core/router_core.c +++ b/src/router_core/router_core.c @@ -18,6 +18,12 @@ */ #include "router_core_private.h" +#include <stdio.h> + + +ALLOC_DEFINE(qdr_address_t); +ALLOC_DEFINE(qdr_node_t); +ALLOC_DEFINE(qdr_link_ref_t); qdr_core_t *qdr_core(void) @@ -67,12 +73,16 @@ ALLOC_DEFINE(qdr_field_t); qdr_field_t *qdr_field(const char *text) { - size_t length = strlen(text); - size_t ilength = length; + size_t length = strlen(text); + size_t ilength = length; + + if (length == 0) + return 0; + qdr_field_t *field = new_qdr_field_t(); qd_buffer_t *buf; - ZERO(field); + ZERO(field); while (length > 0) { buf = qd_buffer(); size_t cap = qd_buffer_capacity(buf); @@ -92,8 +102,67 @@ qdr_field_t *qdr_field(const char *text) void qdr_field_free(qdr_field_t *field) { - qd_field_iterator_free(field->iterator); - qd_buffer_list_free_buffers(&field->buffers); - free_qdr_field_t(field); + if (field) { + qd_field_iterator_free(field->iterator); + qd_buffer_list_free_buffers(&field->buffers); + free_qdr_field_t(field); + } +} + + +qdr_address_t *qdr_address(qd_address_semantics_t semantics) +{ + qdr_address_t *addr = new_qdr_address_t(); + ZERO(addr); + addr->semantics = semantics; + addr->forwarder = qd_router_get_forwarder(semantics); + return addr; +} + + +qdr_address_t *qdr_add_local_address(qdr_core_t *core, const char *address, qd_address_semantics_t semantics) +{ + char addr_string[1000]; + qdr_address_t *addr = 0; + qd_field_iterator_t *iter = 0; + + snprintf(addr_string, sizeof(addr_string), "L%s", address); + iter = qd_address_iterator_string(addr_string, ITER_VIEW_ALL); + + qd_hash_retrieve(core->addr_hash, iter, (void**) &addr); + if (!addr) { + addr = qdr_address(semantics); + qd_hash_insert(core->addr_hash, iter, addr, &addr->hash_handle); + DEQ_ITEM_INIT(addr); + DEQ_INSERT_TAIL(core->addrs, addr); + addr->block_deletion = true; + } + qd_field_iterator_free(iter); + return addr; +} + + +void qdr_add_link_ref(qdr_link_ref_list_t *ref_list, qdr_link_t *link) +{ + qdr_link_ref_t *ref = new_qdr_link_ref_t(); + DEQ_ITEM_INIT(ref); + ref->link = link; + link->ref = ref; + DEQ_INSERT_TAIL(*ref_list, ref); +} + + +void qdr_del_link_ref(qdr_link_ref_list_t *ref_list, qdr_link_t *link) +{ +} + + +void qdr_add_node_ref(qdr_router_ref_list_t *ref_list, qdr_node_t *rnode) +{ +} + + +void qdr_del_node_ref(qdr_router_ref_list_t *ref_list, qdr_node_t *rnode) +{ } http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/382fb1b0/src/router_core/router_core_private.h ---------------------------------------------------------------------- diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h index 94ec4a1..5803299 100644 --- a/src/router_core/router_core_private.h +++ b/src/router_core/router_core_private.h @@ -25,6 +25,10 @@ #include <qpid/dispatch/log.h> #include <memory.h> +/** + * qdr_field_t - This type is used to pass variable-length fields (strings, etc.) into + * and out of the router-core thread. + */ typedef struct { qd_buffer_list_t buffers; qd_field_iterator_t *iterator; @@ -33,6 +37,10 @@ typedef struct { qdr_field_t *qdr_field(const char *string); void qdr_field_free(qdr_field_t *field); + +/** + * qdr_action_t - This type represents one work item to be performed by the router-core thread. + */ typedef struct qdr_action_t qdr_action_t; typedef void (*qdr_action_handler_t) (qdr_core_t *core, qdr_action_t *action); @@ -55,6 +63,119 @@ struct qdr_action_t { ALLOC_DECLARE(qdr_action_t); DEQ_DECLARE(qdr_action_t, qdr_action_list_t); +typedef struct qdr_address_t qdr_address_t; +typedef struct qdr_node_t qdr_node_t; +typedef struct qdr_router_ref_t qdr_router_ref_t; +typedef struct qdr_link_ref_t qdr_link_ref_t; +typedef struct qdr_lrp_t qdr_lrp_t; +typedef struct qdr_lrp_ref_t qdr_lrp_ref_t; + +struct qdr_node_t { + DEQ_LINKS(qdr_node_t); + qdr_address_t *owning_addr; + int mask_bit; + qdr_node_t *next_hop; ///< Next hop node _if_ this is not a neighbor node + qdr_link_t *peer_link; ///< Outgoing link _if_ this is a neighbor node + uint32_t ref_count; + qd_bitmask_t *valid_origins; +}; + +ALLOC_DECLARE(qdr_node_t); +DEQ_DECLARE(qdr_node_t, qdr_node_list_t); + + +struct qdr_router_ref_t { + DEQ_LINKS(qdr_router_ref_t); + qdr_node_t *router; +}; + +ALLOC_DECLARE(qdr_router_ref_t); +DEQ_DECLARE(qdr_router_ref_t, qdr_router_ref_list_t); + + +struct qdr_link_t { + DEQ_LINKS(qdr_link_t); + int mask_bit; ///< Unique mask bit if this is an inter-router link + qd_link_type_t link_type; + qd_direction_t link_direction; + qdr_address_t *owning_addr; ///< [ref] Address record that owns this link + //qd_waypoint_t *waypoint; ///< [ref] Waypoint that owns this link + qd_link_t *link; ///< [own] Link pointer + qdr_link_t *connected_link; ///< [ref] If this is a link-route, reference the connected link + qdr_link_ref_t *ref; ///< Pointer to a containing reference object + char *target; ///< Target address for incoming links + qd_routed_event_list_t event_fifo; ///< FIFO of outgoing delivery/link events (no messages) + qd_routed_event_list_t msg_fifo; ///< FIFO of incoming or outgoing message deliveries + qd_router_delivery_list_t deliveries; ///< [own] outstanding unsettled deliveries + bool strip_inbound_annotations; ///<should the dispatch specific inbound annotations be stripped at the ingress router + bool strip_outbound_annotations; ///<should the dispatch specific outbound annotations be stripped at the egress router +}; + +struct qdr_link_ref_t { + DEQ_LINKS(qdr_link_ref_t); + qdr_link_t *link; +}; + +ALLOC_DECLARE(qdr_link_ref_t); +DEQ_DECLARE(qdr_link_ref_t, qdr_link_ref_list_t); + + +struct qdr_lrp_t { + DEQ_LINKS(qdr_lrp_t); + char *prefix; + bool inbound; + bool outbound; + qd_lrp_container_t *container; +}; + +DEQ_DECLARE(qdr_lrp_t, qdr_lrp_list_t); + +struct qdr_lrp_ref_t { + DEQ_LINKS(qdr_lrp_ref_t); + qdr_lrp_t *lrp; +}; + +ALLOC_DECLARE(qdr_lrp_ref_t); +DEQ_DECLARE(qdr_lrp_ref_t, qdr_lrp_ref_list_t); + + +struct qdr_address_t { + DEQ_LINKS(qdr_address_t); + qd_router_message_cb_t on_message; ///< In-Process Message Consumer + void *on_message_context; ///< In-Process Consumer context + qdr_lrp_ref_list_t lrps; ///< Local link-route destinations + qdr_link_ref_list_t rlinks; ///< Locally-Connected Consumers + qdr_router_ref_list_t rnodes; ///< Remotely-Connected Consumers + qd_hash_handle_t *hash_handle; ///< Linkage back to the hash table entry + qd_address_semantics_t semantics; + bool toggle; + bool waypoint; + bool block_deletion; + qd_router_forwarder_t *forwarder; + + /**@name Statistics */ + ///@{ + uint64_t deliveries_ingress; + uint64_t deliveries_egress; + uint64_t deliveries_transit; + uint64_t deliveries_to_container; + uint64_t deliveries_from_container; + ///@} +}; + +ALLOC_DECLARE(qdr_address_t); +DEQ_DECLARE(qdr_address_t, qdr_address_list_t); + +qdr_address_t *qdr_address(qd_address_semantics_t semantics); +qdr_address_t *qdr_add_local_address(qdr_core_t *core, const char *addr, qd_address_semantics_t semantics); + +void qdr_add_link_ref(qdr_link_ref_list_t *ref_list, qdr_link_t *link); +void qdr_del_link_ref(qdr_link_ref_list_t *ref_list, qdr_link_t *link); + +void qdr_add_node_ref(qdr_router_ref_list_t *ref_list, qdr_node_t *rnode); +void qdr_del_node_ref(qdr_router_ref_list_t *ref_list, qdr_node_t *rnode); + + struct qdr_core_t { qd_log_source_t *log; sys_cond_t *cond; @@ -67,6 +188,19 @@ struct qdr_core_t { qdr_mobile_added_t rt_mobile_added; qdr_mobile_removed_t rt_mobile_removed; qdr_link_lost_t rt_link_lost; + + const char *router_area; + const char *router_id; + + qdr_address_list_t addrs; + qd_hash_t *addr_hash; + qdr_address_t *router_addr; + qdr_address_t *routerma_addr; + qdr_address_t *hello_addr; + + //qdr_link_list_t links; + qdr_node_list_t routers; + qdr_node_t **routers_by_mask_bit; }; void *router_core_thread(void *arg); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
