Repository: qpid-dispatch Updated Branches: refs/heads/tross-DISPATCH-179-1 8be2a0a92 -> 45b627bde
DISPATCH-179 - Converted Embedded Python IoAdapter to use router core for IO. Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/45b627bd Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/45b627bd Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/45b627bd Branch: refs/heads/tross-DISPATCH-179-1 Commit: 45b627bde0fc702fa6aa05cdd2f95a56df7a5edc Parents: 8be2a0a Author: Ted Ross <[email protected]> Authored: Tue Dec 22 15:39:29 2015 -0500 Committer: Ted Ross <[email protected]> Committed: Tue Dec 22 15:39:29 2015 -0500 ---------------------------------------------------------------------- python/qpid_dispatch_internal/router/engine.py | 2 + src/CMakeLists.txt | 1 + src/python_embedded.c | 27 +++++---- src/router_core/route_tables.c | 13 ++-- src/router_core/router_core.c | 4 +- src/router_core/router_core_private.h | 17 +++++- src/router_core/transfer.c | 50 +++++++++++++--- src/router_node.c | 66 --------------------- 8 files changed, 83 insertions(+), 97 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/45b627bd/python/qpid_dispatch_internal/router/engine.py ---------------------------------------------------------------------- diff --git a/python/qpid_dispatch_internal/router/engine.py b/python/qpid_dispatch_internal/router/engine.py index 1931db4..768b68e 100644 --- a/python/qpid_dispatch_internal/router/engine.py +++ b/python/qpid_dispatch_internal/router/engine.py @@ -54,6 +54,8 @@ class RouterEngine: self._log_general = LogAdapter("ROUTER") self.io_adapter = [IoAdapter(self.receive, "qdrouter"), IoAdapter(self.receive, "qdrouter.ma"), + IoAdapter(self.receive, "qdrouter", 'T'), + IoAdapter(self.receive, "qdrouter.ma", 'T'), IoAdapter(self.receive, "qdhello")] self.max_routers = max_routers self.id = router_id http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/45b627bd/src/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 333b4f0..73b3144 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -76,6 +76,7 @@ set(qpid_dispatch_SOURCES router_core/route_tables.c router_core/management_agent.c router_core/terminus.c + router_core/transfer.c router_delivery.c router_node.c router_forwarders.c http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/45b627bd/src/python_embedded.c ---------------------------------------------------------------------- diff --git a/src/python_embedded.c b/src/python_embedded.c index 25b2e4d..d8bab3f 100644 --- a/src/python_embedded.c +++ b/src/python_embedded.c @@ -434,9 +434,10 @@ static PyTypeObject LogAdapterType = { typedef struct { PyObject_HEAD - PyObject *handler; - qd_dispatch_t *qd; - qd_address_t *addr; + PyObject *handler; + qd_dispatch_t *qd; + qdr_core_t *core; + qdr_subscription_t *sub; } IoAdapter; // Parse an iterator to a python object. @@ -526,21 +527,21 @@ static void qd_io_rx_handler(void *context, qd_message_t *msg, int link_id) static int IoAdapter_init(IoAdapter *self, PyObject *args, PyObject *kwds) { PyObject *addr; - int global = 0; - if (!PyArg_ParseTuple(args, "OO|i", &self->handler, &addr, &global)) + char aclass = 'L'; + char phase = '0'; + if (!PyArg_ParseTuple(args, "OO|cc", &self->handler, &addr, &aclass, &phase)) return -1; if (!PyCallable_Check(self->handler)) { PyErr_SetString(PyExc_TypeError, "IoAdapter.__init__ handler is not callable"); return -1; } Py_INCREF(self->handler); - self->qd = dispatch; + self->qd = dispatch; + self->core = qd_router_core(self->qd); const char *address = PyString_AsString(addr); if (!address) return -1; qd_error_clear(); - self->addr = - qd_router_register_address(self->qd, address, qd_io_rx_handler, self, - py_semantics, global, 0); + self->sub = qdr_core_subscribe(self->core, address, aclass, phase, py_semantics, qd_io_rx_handler, self); if (qd_error_code()) { PyErr_SetString(PyExc_RuntimeError, qd_error_message()); return -1; @@ -550,8 +551,7 @@ static int IoAdapter_init(IoAdapter *self, PyObject *args, PyObject *kwds) static void IoAdapter_dealloc(IoAdapter* self) { - qd_router_unregister_address(self->addr); - free(self->addr); + qdr_core_unsubscribe(self->sub); Py_DECREF(self->handler); self->ob_type->tp_free((PyObject*)self); } @@ -595,8 +595,9 @@ static PyObject *qd_python_send(PyObject *self, PyObject *args) IoAdapter *ioa = (IoAdapter*) self; qd_composed_field_t *field = 0; PyObject *message = 0; + int no_echo = 1; - if (!PyArg_ParseTuple(args, "O", &message)) + if (!PyArg_ParseTuple(args, "O|i", &message, &no_echo)) return 0; if (compose_python_message(&field, message, ioa->qd) == QD_ERROR_NONE) { @@ -604,7 +605,7 @@ static PyObject *qd_python_send(PyObject *self, PyObject *args) qd_message_compose_2(msg, field); PyObject *address = PyObject_GetAttrString(message, "address"); if (address) { - qd_router_send2(ioa->qd, PyString_AsString(address), msg); + qdr_send_to(ioa->core, msg, PyString_AsString(address), (bool) no_echo); Py_DECREF(address); } qd_compose_free(field); http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/45b627bd/src/router_core/route_tables.c ---------------------------------------------------------------------- diff --git a/src/router_core/route_tables.c b/src/router_core/route_tables.c index 1007e8c..ad0a035 100644 --- a/src/router_core/route_tables.c +++ b/src/router_core/route_tables.c @@ -175,9 +175,11 @@ void qdr_route_table_setup_CT(qdr_core_t *core) DEQ_INIT(core->routers); core->addr_hash = qd_hash(10, 32, 0); - core->router_addr = qdr_add_local_address_CT(core, "qdrouter", QD_SEMANTICS_ROUTER_CONTROL); - core->routerma_addr = qdr_add_local_address_CT(core, "qdrouter.ma", QD_SEMANTICS_DEFAULT); - core->hello_addr = qdr_add_local_address_CT(core, "qdhello", QD_SEMANTICS_ROUTER_CONTROL); + core->hello_addr = qdr_add_local_address_CT(core, 'L', "qdhello", QD_SEMANTICS_ROUTER_CONTROL); + core->router_addr_L = qdr_add_local_address_CT(core, 'L', "qdrouter", QD_SEMANTICS_ROUTER_CONTROL); + core->routerma_addr_L = qdr_add_local_address_CT(core, 'L', "qdrouter.ma", QD_SEMANTICS_DEFAULT); + core->router_addr_T = qdr_add_local_address_CT(core, 'T', "qdrouter", QD_SEMANTICS_ROUTER_CONTROL); + core->routerma_addr_T = qdr_add_local_address_CT(core, 'T', "qdrouter.ma", QD_SEMANTICS_DEFAULT); core->neighbor_free_mask = qd_bitmask(1); @@ -259,9 +261,10 @@ static void qdr_add_router_CT(qdr_core_t *core, qdr_action_t *action, bool disca // // Link the router record to the router address records. + // Use the T-class addresses only. // - qdr_add_node_ref(&core->router_addr->rnodes, rnode); - qdr_add_node_ref(&core->routerma_addr->rnodes, rnode); + qdr_add_node_ref(&core->router_addr_T->rnodes, rnode); + qdr_add_node_ref(&core->routerma_addr_T->rnodes, rnode); // // Add the router record to the mask-bit index. http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/45b627bd/src/router_core/router_core.c ---------------------------------------------------------------------- diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c index fa82c06..a61e72c 100644 --- a/src/router_core/router_core.c +++ b/src/router_core/router_core.c @@ -166,13 +166,13 @@ qdr_address_t *qdr_address(qd_address_semantics_t semantics) } -qdr_address_t *qdr_add_local_address_CT(qdr_core_t *core, const char *address, qd_address_semantics_t semantics) +qdr_address_t *qdr_add_local_address_CT(qdr_core_t *core, char aclass, const char *address, qd_address_semantics_t semantics) { char addr_string[1000]; qdr_address_t *addr = 0; qd_field_iterator_t *iter = 0; - snprintf(addr_string, sizeof(addr_string), "L%s", address); + snprintf(addr_string, sizeof(addr_string), "%c%s", aclass, address); iter = qd_address_iterator_string(addr_string, ITER_VIEW_ALL); qd_hash_retrieve(core->addr_hash, iter, (void**) &addr); http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/45b627bd/src/router_core/router_core_private.h ---------------------------------------------------------------------- diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h index 1cc5cf2..25c7d89 100644 --- a/src/router_core/router_core_private.h +++ b/src/router_core/router_core_private.h @@ -85,6 +85,7 @@ struct qdr_action_t { qd_address_semantics_t semantics; qdr_subscription_t *subscription; qd_message_t *message; + bool exclude_inprocess; } io; // @@ -155,6 +156,14 @@ ALLOC_DECLARE(qdr_router_ref_t); DEQ_DECLARE(qdr_router_ref_t, qdr_router_ref_list_t); +struct qdr_delivery_t { + DEQ_LINKS(qdr_delivery_t); +}; + +ALLOC_DECLARE(qdr_delivery_t); +DEQ_DECLARE(qdr_delivery_t, qdr_delivery_list_t); + + struct qdr_link_t { DEQ_LINKS(qdr_link_t); qdr_core_t *core; @@ -243,7 +252,7 @@ ALLOC_DECLARE(qdr_address_t); DEQ_DECLARE(qdr_address_t, qdr_address_list_t); qdr_address_t *qdr_address(qd_address_semantics_t semantics); -qdr_address_t *qdr_add_local_address_CT(qdr_core_t *core, const char *addr, qd_address_semantics_t semantics); +qdr_address_t *qdr_add_local_address_CT(qdr_core_t *core, char aclass, const char *addr, qd_address_semantics_t semantics); void qdr_add_link_ref(qdr_link_ref_list_t *ref_list, qdr_link_t *link); void qdr_del_link_ref(qdr_link_ref_list_t *ref_list, qdr_link_t *link); @@ -364,9 +373,11 @@ struct qdr_core_t { qdr_address_list_t addrs; qd_hash_t *addr_hash; - qdr_address_t *router_addr; - qdr_address_t *routerma_addr; qdr_address_t *hello_addr; + qdr_address_t *router_addr_L; + qdr_address_t *routerma_addr_L; + qdr_address_t *router_addr_T; + qdr_address_t *routerma_addr_T; qdr_node_list_t routers; qd_bitmask_t *neighbor_free_mask; http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/45b627bd/src/router_core/transfer.c ---------------------------------------------------------------------- diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c index d8a05ee..8911339 100644 --- a/src/router_core/transfer.c +++ b/src/router_core/transfer.c @@ -21,6 +21,8 @@ #include <qpid/dispatch/amqp.h> #include <stdio.h> +ALLOC_DEFINE(qdr_delivery_t); + static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool discard); static void qdr_link_deliver_to_CT(qdr_core_t *core, qdr_action_t *action, bool discard); static void qdr_send_to_CT(qdr_core_t *core, qdr_action_t *action, bool discard); @@ -30,29 +32,37 @@ static void qdr_send_to_CT(qdr_core_t *core, qdr_action_t *action, bool discard) // Internal Functions //================================================================================== + //================================================================================== // Interface Functions //================================================================================== qdr_delivery_t *qdr_link_deliver(qdr_link_t *link, pn_delivery_t *delivery, qd_message_t *msg) { - qdr_action_t *action = qdr_action(qdr_link_delivery_CT, "link_delivery"); + qdr_action_t *action = qdr_action(qdr_link_deliver_CT, "link_deliver"); + qdr_delivery_t *dlv = new_qdr_delivery_t(); - qdr_action_enqueue(core, action); + qdr_action_enqueue(link->core, action); + return dlv; } qdr_delivery_t *qdr_link_deliver_to(qdr_link_t *link, pn_delivery_t *delivery, qd_message_t *msg, qd_field_iterator_t *addr) { - qdr_action_t *action = qdr_action(qdr_link_delivery_to_CT, "link_delivery_to"); + qdr_action_t *action = qdr_action(qdr_link_deliver_to_CT, "link_deliver_to"); + qdr_delivery_t *dlv = new_qdr_delivery_t(); - qdr_action_enqueue(core, action); + qdr_action_enqueue(link->core, action); + return dlv; } void qdr_send_to(qdr_core_t *core, qd_message_t *msg, const char *addr, bool exclude_inprocess) { qdr_action_t *action = qdr_action(qdr_send_to_CT, "send_to"); + action->args.io.address = qdr_field(addr); + action->args.io.message = qd_message_copy(msg); + action->args.io.exclude_inprocess = exclude_inprocess; qdr_action_enqueue(core, action); } @@ -62,14 +72,25 @@ void qdr_send_to(qdr_core_t *core, qd_message_t *msg, const char *addr, bool exc // In-Thread Functions //================================================================================== -static void qdr_link_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool discard) +static void qdr_route_message_CT(qdr_core_t *core, + qdr_address_t *addr, + qd_message_t *msg, + qdr_delivery_t *dlv, + bool exclude_inprocess) +{ + const char *key = (const char*) qd_hash_key_by_handle(addr->hash_handle); + printf("qdr_route_message_CT - %s, %s\n", key, exclude_inprocess ? "yes" : "no"); +} + + +static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool discard) { if (discard) return; } -static void qdr_link_delivery_to_CT(qdr_core_t *core, qdr_action_t *action, bool discard) +static void qdr_link_deliver_to_CT(qdr_core_t *core, qdr_action_t *action, bool discard) { if (discard) return; @@ -78,7 +99,20 @@ static void qdr_link_delivery_to_CT(qdr_core_t *core, qdr_action_t *action, bool static void qdr_send_to_CT(qdr_core_t *core, qdr_action_t *action, bool discard) { - if (discard) - return; + qdr_field_t *addr_field = action->args.io.address; + qd_message_t *msg = action->args.io.message; + + if (!discard) { + qdr_address_t *addr; + qd_address_iterator_reset_view(addr_field->iterator, ITER_VIEW_ADDRESS_HASH); + qd_hash_retrieve(core->addr_hash, addr_field->iterator, (void**) &addr); + if (addr) + qdr_route_message_CT(core, addr, msg, 0, action->args.io.exclude_inprocess); + else + qd_log(core->log, QD_LOG_DEBUG, "In-process send to an unknown address"); + } + + qdr_field_free(addr_field); + qd_message_free(msg); } http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/45b627bd/src/router_node.c ---------------------------------------------------------------------- diff --git a/src/router_node.c b/src/router_node.c index 97b3fc1..33f4eba 100644 --- a/src/router_node.c +++ b/src/router_node.c @@ -999,7 +999,6 @@ static qd_node_type_t router_node = {"router", 0, 0, router_closed_handler}; static int type_registered = 0; - qd_router_t *qd_router(qd_dispatch_t *qd, qd_router_mode_t mode, const char *area, const char *id) { if (!type_registered) { @@ -1055,16 +1054,6 @@ qd_router_t *qd_router(qd_dispatch_t *qd, qd_router_mode_t mode, const char *are DEQ_INIT(router->waypoints); // - // Create addresses for all of the routers in the topology. It will be registered - // locally later in the initialization sequence. - // - if (router->router_mode == QD_ROUTER_MODE_INTERIOR) { - router->router_addr = qd_router_register_address(qd, "qdrouter", 0, 0, QD_SEMANTICS_ROUTER_CONTROL, false, 0); - router->routerma_addr = qd_router_register_address(qd, "qdrouter.ma", 0, 0, QD_SEMANTICS_DEFAULT, false, 0); - router->hello_addr = qd_router_register_address(qd, "qdhello", 0, 0, QD_SEMANTICS_ROUTER_CONTROL, false, 0); - } - - // // Inform the field iterator module of this router's id and area. The field iterator // uses this to offload some of the address-processing load from the router. // @@ -1159,11 +1148,6 @@ void qd_router_setup_late(qd_dispatch_t *qd) qd_router_python_setup(qd->router); qd_timer_schedule(qd->router->timer, 1000); - - //Register the C management agent - // DEPRECATE - qd_router_register_address(qd, CORE_AGENT_ADDRESS, management_agent_handler, (void *) qd, QD_SEMANTICS_DEFAULT, true, 0/*forwarder*/); - qd_router_register_address(qd, CORE_AGENT_ADDRESS, management_agent_handler, (void *) qd, QD_SEMANTICS_DEFAULT, false, 0/*forwarder*/); } void qd_router_free(qd_router_t *router) @@ -1217,56 +1201,6 @@ qdr_core_t *qd_router_core(qd_dispatch_t *qd) } -qd_address_t *qd_router_register_address(qd_dispatch_t *qd, - const char *address, - qd_router_message_cb_t on_message, - void *context, - qd_address_semantics_t semantics, - bool global, - qd_router_forwarder_t *forwarder) -{ - char addr_string[1000]; - qd_router_t *router = qd->router; - qd_address_t *addr = 0; - qd_field_iterator_t *iter = 0; - - snprintf(addr_string, sizeof(addr_string), "%s%s", global ? "M0" : "L", address); - iter = qd_address_iterator_string(addr_string, ITER_VIEW_NO_HOST); - - sys_mutex_lock(router->lock); - qd_hash_retrieve(router->addr_hash, iter, (void**) &addr); - if (!addr) { - addr = qd_address(semantics); - qd_hash_insert(router->addr_hash, iter, addr, &addr->hash_handle); - DEQ_ITEM_INIT(addr); - DEQ_INSERT_TAIL(router->addrs, addr); - qd_entity_cache_add(QD_ROUTER_ADDRESS_TYPE, addr); - } - qd_field_iterator_free(iter); - - addr->on_message = on_message; - addr->on_message_context = context; - if (forwarder) { - if (addr->forwarder) addr->forwarder->release(addr->forwarder); - addr->forwarder = forwarder; - } - - sys_mutex_unlock(router->lock); - - if (on_message) - qd_log(router->log_source, QD_LOG_INFO, "In-Process Address Registered: %s", address); - assert(addr); - return addr; -} - - -void qd_router_unregister_address(qd_address_t *ad) -{ - // if (ad->forwarder) ad->forwarder->release(ad->forwarder); - //free_qd_address_t(ad); -} - - qd_address_t *qd_router_address_lookup_LH(qd_router_t *router, qd_field_iterator_t *addr_iter, bool *is_local, bool *is_direct) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
