Repository: qpid-dispatch
Updated Branches:
  refs/heads/tross-DISPATCH-179-1 8be2a0a92 -> 45b627bde


DISPATCH-179 - Converted Embedded Python IoAdapter to use router core for IO.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/45b627bd
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/45b627bd
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/45b627bd

Branch: refs/heads/tross-DISPATCH-179-1
Commit: 45b627bde0fc702fa6aa05cdd2f95a56df7a5edc
Parents: 8be2a0a
Author: Ted Ross <[email protected]>
Authored: Tue Dec 22 15:39:29 2015 -0500
Committer: Ted Ross <[email protected]>
Committed: Tue Dec 22 15:39:29 2015 -0500

----------------------------------------------------------------------
 python/qpid_dispatch_internal/router/engine.py |  2 +
 src/CMakeLists.txt                             |  1 +
 src/python_embedded.c                          | 27 +++++----
 src/router_core/route_tables.c                 | 13 ++--
 src/router_core/router_core.c                  |  4 +-
 src/router_core/router_core_private.h          | 17 +++++-
 src/router_core/transfer.c                     | 50 +++++++++++++---
 src/router_node.c                              | 66 ---------------------
 8 files changed, 83 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/45b627bd/python/qpid_dispatch_internal/router/engine.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/router/engine.py 
b/python/qpid_dispatch_internal/router/engine.py
index 1931db4..768b68e 100644
--- a/python/qpid_dispatch_internal/router/engine.py
+++ b/python/qpid_dispatch_internal/router/engine.py
@@ -54,6 +54,8 @@ class RouterEngine:
         self._log_general   = LogAdapter("ROUTER")
         self.io_adapter     = [IoAdapter(self.receive, "qdrouter"),
                                IoAdapter(self.receive, "qdrouter.ma"),
+                               IoAdapter(self.receive, "qdrouter", 'T'),
+                               IoAdapter(self.receive, "qdrouter.ma", 'T'),
                                IoAdapter(self.receive, "qdhello")]
         self.max_routers    = max_routers
         self.id             = router_id

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/45b627bd/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 333b4f0..73b3144 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -76,6 +76,7 @@ set(qpid_dispatch_SOURCES
   router_core/route_tables.c
   router_core/management_agent.c
   router_core/terminus.c
+  router_core/transfer.c
   router_delivery.c
   router_node.c
   router_forwarders.c

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/45b627bd/src/python_embedded.c
----------------------------------------------------------------------
diff --git a/src/python_embedded.c b/src/python_embedded.c
index 25b2e4d..d8bab3f 100644
--- a/src/python_embedded.c
+++ b/src/python_embedded.c
@@ -434,9 +434,10 @@ static PyTypeObject LogAdapterType = {
 
 typedef struct {
     PyObject_HEAD
-    PyObject       *handler;
-    qd_dispatch_t  *qd;
-    qd_address_t   *addr;
+    PyObject           *handler;
+    qd_dispatch_t      *qd;
+    qdr_core_t         *core;
+    qdr_subscription_t *sub;
 } IoAdapter;
 
 // Parse an iterator to a python object.
@@ -526,21 +527,21 @@ static void qd_io_rx_handler(void *context, qd_message_t 
*msg, int link_id)
 static int IoAdapter_init(IoAdapter *self, PyObject *args, PyObject *kwds)
 {
     PyObject *addr;
-    int global = 0;
-    if (!PyArg_ParseTuple(args, "OO|i", &self->handler, &addr, &global))
+    char aclass = 'L';
+    char phase  = '0';
+    if (!PyArg_ParseTuple(args, "OO|cc", &self->handler, &addr, &aclass, 
&phase))
         return -1;
     if (!PyCallable_Check(self->handler)) {
         PyErr_SetString(PyExc_TypeError, "IoAdapter.__init__ handler is not 
callable");
         return -1;
     }
     Py_INCREF(self->handler);
-    self->qd = dispatch;
+    self->qd   = dispatch;
+    self->core = qd_router_core(self->qd);
     const char *address = PyString_AsString(addr);
     if (!address) return -1;
     qd_error_clear();
-    self->addr =
-        qd_router_register_address(self->qd, address, qd_io_rx_handler, self,
-                                   py_semantics, global, 0);
+    self->sub = qdr_core_subscribe(self->core, address, aclass, phase, 
py_semantics, qd_io_rx_handler, self);
     if (qd_error_code()) {
         PyErr_SetString(PyExc_RuntimeError, qd_error_message());
         return -1;
@@ -550,8 +551,7 @@ static int IoAdapter_init(IoAdapter *self, PyObject *args, 
PyObject *kwds)
 
 static void IoAdapter_dealloc(IoAdapter* self)
 {
-    qd_router_unregister_address(self->addr);
-    free(self->addr);
+    qdr_core_unsubscribe(self->sub);
     Py_DECREF(self->handler);
     self->ob_type->tp_free((PyObject*)self);
 }
@@ -595,8 +595,9 @@ static PyObject *qd_python_send(PyObject *self, PyObject 
*args)
     IoAdapter           *ioa   = (IoAdapter*) self;
     qd_composed_field_t *field = 0;
     PyObject *message = 0;
+    int       no_echo = 1;
 
-    if (!PyArg_ParseTuple(args, "O", &message))
+    if (!PyArg_ParseTuple(args, "O|i", &message, &no_echo))
         return 0;
 
     if (compose_python_message(&field, message, ioa->qd) == QD_ERROR_NONE) {
@@ -604,7 +605,7 @@ static PyObject *qd_python_send(PyObject *self, PyObject 
*args)
         qd_message_compose_2(msg, field);
         PyObject *address = PyObject_GetAttrString(message, "address");
         if (address) {
-            qd_router_send2(ioa->qd, PyString_AsString(address), msg);
+            qdr_send_to(ioa->core, msg, PyString_AsString(address), (bool) 
no_echo);
             Py_DECREF(address);
         }
         qd_compose_free(field);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/45b627bd/src/router_core/route_tables.c
----------------------------------------------------------------------
diff --git a/src/router_core/route_tables.c b/src/router_core/route_tables.c
index 1007e8c..ad0a035 100644
--- a/src/router_core/route_tables.c
+++ b/src/router_core/route_tables.c
@@ -175,9 +175,11 @@ void qdr_route_table_setup_CT(qdr_core_t *core)
     DEQ_INIT(core->routers);
     core->addr_hash = qd_hash(10, 32, 0);
 
-    core->router_addr   = qdr_add_local_address_CT(core, "qdrouter",    
QD_SEMANTICS_ROUTER_CONTROL);
-    core->routerma_addr = qdr_add_local_address_CT(core, "qdrouter.ma", 
QD_SEMANTICS_DEFAULT);
-    core->hello_addr    = qdr_add_local_address_CT(core, "qdhello",     
QD_SEMANTICS_ROUTER_CONTROL);
+    core->hello_addr      = qdr_add_local_address_CT(core, 'L', "qdhello",     
QD_SEMANTICS_ROUTER_CONTROL);
+    core->router_addr_L   = qdr_add_local_address_CT(core, 'L', "qdrouter",    
QD_SEMANTICS_ROUTER_CONTROL);
+    core->routerma_addr_L = qdr_add_local_address_CT(core, 'L', "qdrouter.ma", 
QD_SEMANTICS_DEFAULT);
+    core->router_addr_T   = qdr_add_local_address_CT(core, 'T', "qdrouter",    
QD_SEMANTICS_ROUTER_CONTROL);
+    core->routerma_addr_T = qdr_add_local_address_CT(core, 'T', "qdrouter.ma", 
QD_SEMANTICS_DEFAULT);
 
     core->neighbor_free_mask = qd_bitmask(1);
 
@@ -259,9 +261,10 @@ static void qdr_add_router_CT(qdr_core_t *core, 
qdr_action_t *action, bool disca
 
         //
         // Link the router record to the router address records.
+        // Use the T-class addresses only.
         //
-        qdr_add_node_ref(&core->router_addr->rnodes, rnode);
-        qdr_add_node_ref(&core->routerma_addr->rnodes, rnode);
+        qdr_add_node_ref(&core->router_addr_T->rnodes, rnode);
+        qdr_add_node_ref(&core->routerma_addr_T->rnodes, rnode);
 
         //
         // Add the router record to the mask-bit index.

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/45b627bd/src/router_core/router_core.c
----------------------------------------------------------------------
diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c
index fa82c06..a61e72c 100644
--- a/src/router_core/router_core.c
+++ b/src/router_core/router_core.c
@@ -166,13 +166,13 @@ qdr_address_t *qdr_address(qd_address_semantics_t 
semantics)
 }
 
 
-qdr_address_t *qdr_add_local_address_CT(qdr_core_t *core, const char *address, 
qd_address_semantics_t semantics)
+qdr_address_t *qdr_add_local_address_CT(qdr_core_t *core, char aclass, const 
char *address, qd_address_semantics_t semantics)
 {
     char                 addr_string[1000];
     qdr_address_t       *addr = 0;
     qd_field_iterator_t *iter = 0;
 
-    snprintf(addr_string, sizeof(addr_string), "L%s", address);
+    snprintf(addr_string, sizeof(addr_string), "%c%s", aclass, address);
     iter = qd_address_iterator_string(addr_string, ITER_VIEW_ALL);
 
     qd_hash_retrieve(core->addr_hash, iter, (void**) &addr);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/45b627bd/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h 
b/src/router_core/router_core_private.h
index 1cc5cf2..25c7d89 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -85,6 +85,7 @@ struct qdr_action_t {
             qd_address_semantics_t  semantics;
             qdr_subscription_t     *subscription;
             qd_message_t           *message;
+            bool                    exclude_inprocess;
         } io;
 
         //
@@ -155,6 +156,14 @@ ALLOC_DECLARE(qdr_router_ref_t);
 DEQ_DECLARE(qdr_router_ref_t, qdr_router_ref_list_t);
 
 
+struct qdr_delivery_t {
+    DEQ_LINKS(qdr_delivery_t);
+};
+
+ALLOC_DECLARE(qdr_delivery_t);
+DEQ_DECLARE(qdr_delivery_t, qdr_delivery_list_t);
+
+
 struct qdr_link_t {
     DEQ_LINKS(qdr_link_t);
     qdr_core_t               *core;
@@ -243,7 +252,7 @@ ALLOC_DECLARE(qdr_address_t);
 DEQ_DECLARE(qdr_address_t, qdr_address_list_t);
 
 qdr_address_t *qdr_address(qd_address_semantics_t semantics);
-qdr_address_t *qdr_add_local_address_CT(qdr_core_t *core, const char *addr, 
qd_address_semantics_t semantics);
+qdr_address_t *qdr_add_local_address_CT(qdr_core_t *core, char aclass, const 
char *addr, qd_address_semantics_t semantics);
 
 void qdr_add_link_ref(qdr_link_ref_list_t *ref_list, qdr_link_t *link);
 void qdr_del_link_ref(qdr_link_ref_list_t *ref_list, qdr_link_t *link);
@@ -364,9 +373,11 @@ struct qdr_core_t {
 
     qdr_address_list_t    addrs;
     qd_hash_t            *addr_hash;
-    qdr_address_t        *router_addr;
-    qdr_address_t        *routerma_addr;
     qdr_address_t        *hello_addr;
+    qdr_address_t        *router_addr_L;
+    qdr_address_t        *routerma_addr_L;
+    qdr_address_t        *router_addr_T;
+    qdr_address_t        *routerma_addr_T;
 
     qdr_node_list_t       routers;
     qd_bitmask_t         *neighbor_free_mask;

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/45b627bd/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index d8a05ee..8911339 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -21,6 +21,8 @@
 #include <qpid/dispatch/amqp.h>
 #include <stdio.h>
 
+ALLOC_DEFINE(qdr_delivery_t);
+
 static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool 
discard);
 static void qdr_link_deliver_to_CT(qdr_core_t *core, qdr_action_t *action, 
bool discard);
 static void qdr_send_to_CT(qdr_core_t *core, qdr_action_t *action, bool 
discard);
@@ -30,29 +32,37 @@ static void qdr_send_to_CT(qdr_core_t *core, qdr_action_t 
*action, bool discard)
 // Internal Functions
 
//==================================================================================
 
+
 
//==================================================================================
 // Interface Functions
 
//==================================================================================
 
 qdr_delivery_t *qdr_link_deliver(qdr_link_t *link, pn_delivery_t *delivery, 
qd_message_t *msg)
 {
-    qdr_action_t *action = qdr_action(qdr_link_delivery_CT, "link_delivery");
+    qdr_action_t   *action = qdr_action(qdr_link_deliver_CT, "link_deliver");
+    qdr_delivery_t *dlv    = new_qdr_delivery_t();
 
-    qdr_action_enqueue(core, action);
+    qdr_action_enqueue(link->core, action);
+    return dlv;
 }
 
 
 qdr_delivery_t *qdr_link_deliver_to(qdr_link_t *link, pn_delivery_t *delivery, 
qd_message_t *msg, qd_field_iterator_t *addr)
 {
-    qdr_action_t *action = qdr_action(qdr_link_delivery_to_CT, 
"link_delivery_to");
+    qdr_action_t   *action = qdr_action(qdr_link_deliver_to_CT, 
"link_deliver_to");
+    qdr_delivery_t *dlv    = new_qdr_delivery_t();
 
-    qdr_action_enqueue(core, action);
+    qdr_action_enqueue(link->core, action);
+    return dlv;
 }
 
 
 void qdr_send_to(qdr_core_t *core, qd_message_t *msg, const char *addr, bool 
exclude_inprocess)
 {
     qdr_action_t *action = qdr_action(qdr_send_to_CT, "send_to");
+    action->args.io.address           = qdr_field(addr);
+    action->args.io.message           = qd_message_copy(msg);
+    action->args.io.exclude_inprocess = exclude_inprocess;
 
     qdr_action_enqueue(core, action);
 }
@@ -62,14 +72,25 @@ void qdr_send_to(qdr_core_t *core, qd_message_t *msg, const 
char *addr, bool exc
 // In-Thread Functions
 
//==================================================================================
 
-static void qdr_link_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool 
discard)
+static void qdr_route_message_CT(qdr_core_t     *core,
+                                 qdr_address_t  *addr,
+                                 qd_message_t   *msg,
+                                 qdr_delivery_t *dlv,
+                                 bool            exclude_inprocess)
+{
+    const char *key = (const char*) qd_hash_key_by_handle(addr->hash_handle);
+    printf("qdr_route_message_CT - %s, %s\n", key, exclude_inprocess ? "yes" : 
"no");
+}
+
+
+static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool 
discard)
 {
     if (discard)
         return;
 }
 
 
-static void qdr_link_delivery_to_CT(qdr_core_t *core, qdr_action_t *action, 
bool discard)
+static void qdr_link_deliver_to_CT(qdr_core_t *core, qdr_action_t *action, 
bool discard)
 {
     if (discard)
         return;
@@ -78,7 +99,20 @@ static void qdr_link_delivery_to_CT(qdr_core_t *core, 
qdr_action_t *action, bool
 
 static void qdr_send_to_CT(qdr_core_t *core, qdr_action_t *action, bool 
discard)
 {
-    if (discard)
-        return;
+    qdr_field_t  *addr_field = action->args.io.address;
+    qd_message_t *msg        = action->args.io.message;
+
+    if (!discard) {
+        qdr_address_t *addr;
+        qd_address_iterator_reset_view(addr_field->iterator, 
ITER_VIEW_ADDRESS_HASH);
+        qd_hash_retrieve(core->addr_hash, addr_field->iterator, (void**) 
&addr);
+        if (addr)
+            qdr_route_message_CT(core, addr, msg, 0, 
action->args.io.exclude_inprocess);
+        else
+            qd_log(core->log, QD_LOG_DEBUG, "In-process send to an unknown 
address");
+    }
+
+    qdr_field_free(addr_field);
+    qd_message_free(msg);
 }
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/45b627bd/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index 97b3fc1..33f4eba 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -999,7 +999,6 @@ static qd_node_type_t router_node = {"router", 0, 0,
                                      router_closed_handler};
 static int type_registered = 0;
 
-
 qd_router_t *qd_router(qd_dispatch_t *qd, qd_router_mode_t mode, const char 
*area, const char *id)
 {
     if (!type_registered) {
@@ -1055,16 +1054,6 @@ qd_router_t *qd_router(qd_dispatch_t *qd, 
qd_router_mode_t mode, const char *are
     DEQ_INIT(router->waypoints);
 
     //
-    // Create addresses for all of the routers in the topology.  It will be 
registered
-    // locally later in the initialization sequence.
-    //
-    if (router->router_mode == QD_ROUTER_MODE_INTERIOR) {
-        router->router_addr   = qd_router_register_address(qd, "qdrouter", 0, 
0, QD_SEMANTICS_ROUTER_CONTROL, false, 0);
-        router->routerma_addr = qd_router_register_address(qd, "qdrouter.ma", 
0, 0, QD_SEMANTICS_DEFAULT, false, 0);
-        router->hello_addr    = qd_router_register_address(qd, "qdhello", 0, 
0, QD_SEMANTICS_ROUTER_CONTROL, false, 0);
-    }
-
-    //
     // Inform the field iterator module of this router's id and area.  The 
field iterator
     // uses this to offload some of the address-processing load from the 
router.
     //
@@ -1159,11 +1148,6 @@ void qd_router_setup_late(qd_dispatch_t *qd)
 
     qd_router_python_setup(qd->router);
     qd_timer_schedule(qd->router->timer, 1000);
-
-    //Register the C management agent
-    // DEPRECATE
-    qd_router_register_address(qd, CORE_AGENT_ADDRESS, 
management_agent_handler, (void *) qd, QD_SEMANTICS_DEFAULT, true, 
0/*forwarder*/);
-    qd_router_register_address(qd, CORE_AGENT_ADDRESS, 
management_agent_handler, (void *) qd, QD_SEMANTICS_DEFAULT, false, 
0/*forwarder*/);
 }
 
 void qd_router_free(qd_router_t *router)
@@ -1217,56 +1201,6 @@ qdr_core_t *qd_router_core(qd_dispatch_t *qd)
 }
 
 
-qd_address_t *qd_router_register_address(qd_dispatch_t          *qd,
-                                         const char             *address,
-                                         qd_router_message_cb_t  on_message,
-                                         void                   *context,
-                                         qd_address_semantics_t  semantics,
-                                         bool                    global,
-                                         qd_router_forwarder_t  *forwarder)
-{
-    char                 addr_string[1000];
-    qd_router_t         *router = qd->router;
-    qd_address_t        *addr = 0;
-    qd_field_iterator_t *iter = 0;
-
-    snprintf(addr_string, sizeof(addr_string), "%s%s", global ? "M0" : "L", 
address);
-    iter = qd_address_iterator_string(addr_string, ITER_VIEW_NO_HOST);
-
-    sys_mutex_lock(router->lock);
-    qd_hash_retrieve(router->addr_hash, iter, (void**) &addr);
-    if (!addr) {
-        addr = qd_address(semantics);
-        qd_hash_insert(router->addr_hash, iter, addr, &addr->hash_handle);
-        DEQ_ITEM_INIT(addr);
-        DEQ_INSERT_TAIL(router->addrs, addr);
-        qd_entity_cache_add(QD_ROUTER_ADDRESS_TYPE, addr);
-    }
-    qd_field_iterator_free(iter);
-
-    addr->on_message         = on_message;
-    addr->on_message_context = context;
-    if (forwarder) {
-        if (addr->forwarder) addr->forwarder->release(addr->forwarder);
-        addr->forwarder = forwarder;
-    }
-
-    sys_mutex_unlock(router->lock);
-
-    if (on_message)
-        qd_log(router->log_source, QD_LOG_INFO, "In-Process Address 
Registered: %s", address);
-    assert(addr);
-    return addr;
-}
-
-
-void qd_router_unregister_address(qd_address_t *ad)
-{
-    // if (ad->forwarder) ad->forwarder->release(ad->forwarder);
-    //free_qd_address_t(ad);
-}
-
-
 qd_address_t *qd_router_address_lookup_LH(qd_router_t *router,
                                           qd_field_iterator_t *addr_iter,
                                           bool *is_local, bool *is_direct)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to