Repository: qpid-dispatch
Updated Branches:
  refs/heads/tross-DISPATCH-179-1 40b444015 -> fe3157c51


DISPATCH-179 - Added forwarder infrastructure to the router core.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/fe3157c5
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/fe3157c5
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/fe3157c5

Branch: refs/heads/tross-DISPATCH-179-1
Commit: fe3157c5104ee6e1b0400dd71ba28e115228dc59
Parents: 40b4440
Author: Ted Ross <[email protected]>
Authored: Mon Jan 4 11:11:34 2016 -0500
Committer: Ted Ross <[email protected]>
Committed: Mon Jan 4 11:11:34 2016 -0500

----------------------------------------------------------------------
 include/qpid/dispatch/router.h                 |  70 ++---------
 python/qpid_dispatch_internal/router/engine.py |  10 +-
 src/CMakeLists.txt                             |   2 +-
 src/python_embedded.c                          |  16 ++-
 src/router_config.c                            |  41 ++++---
 src/router_core/connections.c                  |   9 +-
 src/router_core/forwarder.c                    | 122 ++++++++++++++++++++
 src/router_core/route_tables.c                 |  18 ++-
 src/router_core/router_core.c                  |   7 +-
 src/router_core/router_core_private.h          |  14 ++-
 src/router_core/router_core_thread.c           |   1 +
 src/router_node.c                              |   2 +-
 12 files changed, 193 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fe3157c5/include/qpid/dispatch/router.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/router.h b/include/qpid/dispatch/router.h
index 30b9022..499e808 100644
--- a/include/qpid/dispatch/router.h
+++ b/include/qpid/dispatch/router.h
@@ -34,71 +34,17 @@
 
 typedef struct qd_router_t  qd_router_t;
 typedef struct qd_address_t qd_address_t;
-typedef uint8_t             qd_address_semantics_t;
 typedef struct qd_router_delivery_t qd_router_delivery_t;
 
-#include <qpid/dispatch/router_core.h>
-
-/**
- * @name Address fanout semantics
- * @{
- */
-#define QD_FANOUTMASK      0x03
-#define QD_FANOUT_SINGLE   0x00 ///< Message will be delivered to a single 
consumer.
-#define QD_FANOUT_MULTIPLE 0x01 ///< Message will be delivered to multiple 
consumers.
-#define QD_FANOUT_GROUP    0x02 ///< Message will be delivered to one consumer 
per group.
-#define QD_FANOUT(d) (d & QD_FANOUTMASK) ///< Get fanout bits.
-///@}
-
-/**
- * @name Address bias semantics for SINGLE/GROUP fanout
- * @{
- */
-
-#define QD_BIASMASK     0x0c
-#define QD_BIAS_NONE    0x00 ///< Apply no bias (also used for multiple 
fanout).
-#define QD_BIAS_CLOSEST 0x04 ///< Message will be delivered to the closest 
(lowest cost) consumer.
-#define QD_BIAS_SPREAD  0x08 ///< Messages will be spread arbitrarily across 
all consumers.
-#define QD_BIAS_LATENCY 0x0c ///< Messages will be spread to minimize latency 
in light of each consumer's rate of consumption.
-#define QD_BIAS(d) (d & QD_BIASMASK)
-///@}
-
+typedef enum {
+    QD_SEMANTICS_MULTICAST_FLOOD  = 0,
+    QD_SEMANTICS_MULTICAST_ONCE   = 1,
+    QD_SEMANTICS_ANYCAST_CLOSEST  = 2,
+    QD_SEMANTICS_ANYCAST_BALANCED = 3,
+    QD_SEMANTICS_LINK_BALANCED    = 4
+} qd_address_semantics_t;
 
-/**
- * @name Address congestion semantics.
- *
- * This controls that the router will do with
- * received messages that are destined for congested destinations.
- * @{
- */
-#define QD_CONGESTIONMASK          0x30
-/** Drop/Release the message.*/
-#define QD_CONGESTION_DROP         0x00
- /**
-  * Stop issuing replacement credits to slow the producer.  This puts a cap on
-  * the total number of messages addressed to this address from a particular
-  * producer that can be buffered in the router.
-  */
-#define QD_CONGESTION_BACKPRESSURE 0x10
- /** Redirect messages to an alternate address. */
-#define QD_CONGESTION_REDIRECT     0x20
-#define QD_CONGESTION(d) (d & QD_CONGESTIONMASK)
-/// @}
-
-/** @name Other semantics
- * @{
- */
-#define QD_DROP_FOR_SLOW_CONSUMERS 0x40
-#define QD_BYPASS_VALID_ORIGINS    0x80
-///@}
-
-/**
- * @name Sematics groups
- * @{
- */
-#define QD_SEMANTICS_ROUTER_CONTROL (QD_FANOUT_MULTIPLE | QD_BIAS_NONE | 
QD_CONGESTION_DROP | QD_DROP_FOR_SLOW_CONSUMERS | QD_BYPASS_VALID_ORIGINS)
-#define QD_SEMANTICS_DEFAULT        (QD_FANOUT_MULTIPLE | QD_BIAS_NONE | 
QD_CONGESTION_DROP | QD_DROP_FOR_SLOW_CONSUMERS)
-///@}
+#include <qpid/dispatch/router_core.h>
 
 /** Message forwarding descriptor
  *

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fe3157c5/python/qpid_dispatch_internal/router/engine.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/router/engine.py 
b/python/qpid_dispatch_internal/router/engine.py
index 140e5c0..bda27aa 100644
--- a/python/qpid_dispatch_internal/router/engine.py
+++ b/python/qpid_dispatch_internal/router/engine.py
@@ -52,11 +52,11 @@ class RouterEngine:
         self._log_ls        = LogAdapter("ROUTER_LS")
         self._log_ma        = LogAdapter("ROUTER_MA")
         self._log_general   = LogAdapter("ROUTER")
-        self.io_adapter     = [IoAdapter(self.receive, "qdrouter"),
-                               IoAdapter(self.receive, "qdrouter.ma"),
-                               IoAdapter(self.receive, "qdrouter", 'T'),
-                               IoAdapter(self.receive, "qdrouter.ma", 'T'),
-                               IoAdapter(self.receive, "qdhello")]
+        self.io_adapter     = [IoAdapter(self.receive, "qdrouter",    'L', 
'0', SEMANTICS_MULTICAST_FLOOD),
+                               IoAdapter(self.receive, "qdrouter.ma", 'L', 
'0', SEMANTICS_MULTICAST_ONCE),
+                               IoAdapter(self.receive, "qdrouter",    'T', 
'0', SEMANTICS_MULTICAST_FLOOD),
+                               IoAdapter(self.receive, "qdrouter.ma", 'T', 
'0', SEMANTICS_MULTICAST_ONCE),
+                               IoAdapter(self.receive, "qdhello",     'L', 
'0', SEMANTICS_MULTICAST_FLOOD)]
         self.max_routers    = max_routers
         self.id             = router_id
         self.instance       = long(time.time())

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fe3157c5/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 73b3144..e82d805 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -71,6 +71,7 @@ set(qpid_dispatch_SOURCES
   router_core/agent_link.c
   router_core/connections.c
   router_core/error.c
+  router_core/forwarder.c
   router_core/router_core.c
   router_core/router_core_thread.c
   router_core/route_tables.c
@@ -79,7 +80,6 @@ set(qpid_dispatch_SOURCES
   router_core/transfer.c
   router_delivery.c
   router_node.c
-  router_forwarders.c
   router_pynode.c
   schema_enum.c
   server.c

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fe3157c5/src/python_embedded.c
----------------------------------------------------------------------
diff --git a/src/python_embedded.c b/src/python_embedded.c
index 1eb4779..f8afe25 100644
--- a/src/python_embedded.c
+++ b/src/python_embedded.c
@@ -42,8 +42,6 @@ static PyObject        *dispatch_module = 0;
 static PyObject        *message_type = 0;
 static PyObject        *dispatch_python_pkgdir = 0;
 
-static qd_address_semantics_t py_semantics = QD_FANOUT_MULTIPLE | QD_BIAS_NONE 
| QD_CONGESTION_DROP | QD_DROP_FOR_SLOW_CONSUMERS;
-
 static void qd_python_setup(void);
 
 
@@ -527,9 +525,10 @@ static void qd_io_rx_handler(void *context, qd_message_t 
*msg, int link_id)
 static int IoAdapter_init(IoAdapter *self, PyObject *args, PyObject *kwds)
 {
     PyObject *addr;
-    char aclass = 'L';
-    char phase  = '0';
-    if (!PyArg_ParseTuple(args, "OO|cc", &self->handler, &addr, &aclass, 
&phase))
+    char aclass    = 'L';
+    char phase     = '0';
+    int  semantics = QD_SEMANTICS_ANYCAST_BALANCED;
+    if (!PyArg_ParseTuple(args, "OO|cci", &self->handler, &addr, &aclass, 
&phase, &semantics))
         return -1;
     if (!PyCallable_Check(self->handler)) {
         PyErr_SetString(PyExc_TypeError, "IoAdapter.__init__ handler is not 
callable");
@@ -541,7 +540,7 @@ static int IoAdapter_init(IoAdapter *self, PyObject *args, 
PyObject *kwds)
     const char *address = PyString_AsString(addr);
     if (!address) return -1;
     qd_error_clear();
-    self->sub = qdr_core_subscribe(self->core, address, aclass, phase, 
py_semantics, qd_io_rx_handler, self);
+    self->sub = qdr_core_subscribe(self->core, address, aclass, phase, 
semantics, qd_io_rx_handler, self);
     if (qd_error_code()) {
         PyErr_SetString(PyExc_RuntimeError, qd_error_message());
         return -1;
@@ -734,6 +733,11 @@ static void qd_python_setup(void)
         Py_INCREF(ioaType);
         PyModule_AddObject(m, "IoAdapter", (PyObject*) &IoAdapterType);
 
+        qd_register_constant(m, "SEMANTICS_MULTICAST_FLOOD",  
QD_SEMANTICS_MULTICAST_FLOOD);
+        qd_register_constant(m, "SEMANTICS_MULTICAST_ONCE",   
QD_SEMANTICS_MULTICAST_ONCE);
+        qd_register_constant(m, "SEMANTICS_ANYCAST_CLOSEST",  
QD_SEMANTICS_ANYCAST_CLOSEST);
+        qd_register_constant(m, "SEMANTICS_ANYCAST_BALANCED", 
QD_SEMANTICS_ANYCAST_BALANCED);
+
         Py_INCREF(m);
         dispatch_module = m;
     }

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fe3157c5/src/router_config.c
----------------------------------------------------------------------
diff --git a/src/router_config.c b/src/router_config.c
index fc3a8c0..315a468 100644
--- a/src/router_config.c
+++ b/src/router_config.c
@@ -76,32 +76,31 @@ qd_error_t qd_router_configure_address(qd_router_t *router, 
qd_entity_t *entity)
 
     qd_address_semantics_t semantics = 0;
     switch(fanout) {
-      case QD_SCHEMA_FIXEDADDRESS_FANOUT_MULTIPLE: semantics |= 
QD_FANOUT_MULTIPLE; break;
-      case QD_SCHEMA_FIXEDADDRESS_FANOUT_SINGLE: semantics |= 
QD_FANOUT_SINGLE; break;
-      default:
-        free(prefix);
-        free(addr_phase);
-        return qd_error(QD_ERROR_CONFIG, "Invalid fanout value %d", fanout);
-    }
+    case QD_SCHEMA_FIXEDADDRESS_FANOUT_MULTIPLE:
+        semantics = QD_SEMANTICS_MULTICAST_ONCE;
+        break;
 
-    if ((semantics & QD_FANOUTMASK) == QD_FANOUT_SINGLE) {
+    case QD_SCHEMA_FIXEDADDRESS_FANOUT_SINGLE:
         switch(bias) {
-          case QD_SCHEMA_FIXEDADDRESS_BIAS_CLOSEST: semantics |= 
QD_BIAS_CLOSEST; break;
-          case QD_SCHEMA_FIXEDADDRESS_BIAS_SPREAD: semantics |= 
QD_BIAS_SPREAD; break;
-          default:
+        case QD_SCHEMA_FIXEDADDRESS_BIAS_CLOSEST:
+            semantics = QD_SEMANTICS_ANYCAST_CLOSEST;
+            break;
+
+        case QD_SCHEMA_FIXEDADDRESS_BIAS_SPREAD:
+            semantics = QD_SEMANTICS_ANYCAST_BALANCED;
+            break;
+
+        default:
             free(prefix);
             free(addr_phase);
             return qd_error(QD_ERROR_CONFIG, "Invalid bias value %d", fanout);
         }
-        qd_log(router->log_source, QD_LOG_INFO,
-               "Configured Address: prefix=%s phase=%d fanout=%s bias=%s",
-               prefix, phase,
-               qd_schema_fixedAddress_fanout_names[fanout],
-               qd_schema_fixedAddress_bias_names[bias]);
-    } else {
-        semantics |= QD_BIAS_NONE;
-        qd_log(router->log_source, QD_LOG_INFO, "Configured Address: prefix=%s 
phase=%d fanout=%s",
-               prefix, phase, qd_schema_fixedAddress_fanout_names[fanout]);
+        break;
+
+    default:
+        free(prefix);
+        free(addr_phase);
+        return qd_error(QD_ERROR_CONFIG, "Invalid fanout value %d", fanout);
     }
 
     addr_phase->semantics = semantics;
@@ -327,5 +326,5 @@ qd_address_semantics_t 
router_semantics_for_addr(qd_router_t *router, qd_field_i
     }
 
     qd_address_iterator_reset_view(iter, old_view);
-    return phase ? phase->semantics : QD_SEMANTICS_DEFAULT;
+    return phase ? phase->semantics : QD_SEMANTICS_ANYCAST_BALANCED;
 }

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fe3157c5/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index bb6aad9..124f4d6 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -30,9 +30,6 @@ static void qdr_link_inbound_detach_CT(qdr_core_t *core, 
qdr_action_t *action, b
 ALLOC_DEFINE(qdr_connection_t);
 ALLOC_DEFINE(qdr_connection_work_t);
 
-static qd_address_semantics_t qdr_dynamic_semantics = QD_FANOUT_SINGLE | 
QD_BIAS_CLOSEST | QD_CONGESTION_BACKPRESSURE;
-static qd_address_semantics_t qdr_default_semantics = QD_FANOUT_SINGLE | 
QD_BIAS_SPREAD  | QD_CONGESTION_BACKPRESSURE;
-
 typedef enum {
     QDR_CONDITION_NO_ROUTE_TO_DESTINATION,
     QDR_CONDITION_ROUTED_LINK_LOST,
@@ -378,7 +375,7 @@ static qd_address_semantics_t 
qdr_semantics_for_address(qdr_core_t *core, qd_fie
     //           static routes; yes: prevents occlusion by mobile addresses 
with specified semantics)
     //
     qd_hash_retrieve_prefix(core->addr_hash, iter, (void**) &addr);
-    return addr ? addr->semantics : qdr_default_semantics;
+    return /* addr ? addr->semantics : */  QD_SEMANTICS_ANYCAST_BALANCED; // 
FIXME
 }
 
 
@@ -480,7 +477,7 @@ static qdr_address_t 
*qdr_lookup_terminus_address_CT(qdr_core_t     *core,
             qd_field_iterator_t *temp_iter = 
qd_address_iterator_string(temp_addr, ITER_VIEW_ADDRESS_HASH);
             qd_hash_retrieve(core->addr_hash, temp_iter, (void**) &addr);
             if (!addr) {
-                addr = qdr_address(qdr_dynamic_semantics);
+                addr = qdr_address_CT(core, QD_SEMANTICS_ANYCAST_CLOSEST);
                 qd_hash_insert(core->addr_hash, temp_iter, addr, 
&addr->hash_handle);
                 DEQ_INSERT_TAIL(core->addrs, addr);
                 qdr_terminus_set_address(terminus, temp_addr);
@@ -517,7 +514,7 @@ static qdr_address_t 
*qdr_lookup_terminus_address_CT(qdr_core_t     *core,
     qd_hash_retrieve(core->addr_hash, iter, (void**) &addr);
     if (!addr && create_if_not_found) {
         qd_address_semantics_t sem = qdr_semantics_for_address(core, iter);
-        addr = qdr_address(sem);
+        addr = qdr_address_CT(core, sem);
         qd_hash_insert(core->addr_hash, iter, addr, &addr->hash_handle);
         DEQ_INSERT_TAIL(core->addrs, addr);
     }

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fe3157c5/src/router_core/forwarder.c
----------------------------------------------------------------------
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
new file mode 100644
index 0000000..1ab2e39
--- /dev/null
+++ b/src/router_core/forwarder.c
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "router_core_private.h"
+#include <qpid/dispatch/amqp.h>
+#include <stdio.h>
+
+typedef void (*qdr_forward_message_t) (qdr_core_t      *core,
+                                       qdr_forwarder_t *forw,
+                                       qd_message_t    *msg,
+                                       qdr_delivery_t  *in_delivery);
+typedef void (*qdr_forward_attach_t) (qdr_core_t      *core,
+                                      qdr_forwarder_t *forw,
+                                      qdr_link_t      *link);
+
+struct qdr_forwarder_t {
+    qdr_forward_message_t forward_message;
+    qdr_forward_attach_t  forward_attach;
+    bool                  bypass_valid_origins;
+};
+
+//==================================================================================
+// Built-in Forwarders
+//==================================================================================
+
+void qdr_forward_multicast(qdr_core_t      *core,
+                           qdr_forwarder_t *forw,
+                           qd_message_t    *msg,
+                           qdr_delivery_t  *in_delivery)
+{
+}
+
+
+void qdr_forward_closest(qdr_core_t      *core,
+                         qdr_forwarder_t *forw,
+                         qd_message_t    *msg,
+                         qdr_delivery_t  *in_delivery)
+{
+}
+
+
+void qdr_forward_balanced(qdr_core_t      *core,
+                          qdr_forwarder_t *forw,
+                          qd_message_t    *msg,
+                          qdr_delivery_t  *in_delivery)
+{
+}
+
+
+void qdr_forward_link_balanced(qdr_core_t      *core,
+                               qdr_forwarder_t *forw,
+                               qdr_link_t      *link)
+{
+}
+
+
+//==================================================================================
+// In-Thread API Functions
+//==================================================================================
+
+qdr_forwarder_t *qdr_new_forwarder(qdr_forward_message_t fm, 
qdr_forward_attach_t fa, bool bypass_valid_origins)
+{
+    qdr_forwarder_t *forw = NEW(qdr_forwarder_t);
+
+    forw->forward_message      = fm;
+    forw->forward_attach       = fa;
+    forw->bypass_valid_origins = bypass_valid_origins;
+
+    return forw;
+}
+
+
+void qdr_forwarder_setup_CT(qdr_core_t *core)
+{
+    //
+    // Create message forwarders
+    //
+    core->forwarders[QD_SEMANTICS_MULTICAST_FLOOD]  = 
qdr_new_forwarder(qdr_forward_multicast, 0, true);
+    core->forwarders[QD_SEMANTICS_MULTICAST_ONCE]   = 
qdr_new_forwarder(qdr_forward_multicast, 0, false);
+    core->forwarders[QD_SEMANTICS_ANYCAST_CLOSEST]  = 
qdr_new_forwarder(qdr_forward_closest,   0, false);
+    core->forwarders[QD_SEMANTICS_ANYCAST_BALANCED] = 
qdr_new_forwarder(qdr_forward_balanced,  0, false);
+
+    //
+    // Create link forwarders
+    //
+    core->forwarders[QD_SEMANTICS_LINK_BALANCED] = qdr_new_forwarder(0, 
qdr_forward_link_balanced, false);
+}
+
+
+qdr_forwarder_t *qdr_forwarder_CT(qdr_core_t *core, qd_address_semantics_t 
semantics)
+{
+    if (semantics <= QD_SEMANTICS_LINK_BALANCED)
+        return core->forwarders[semantics];
+    return 0;
+}
+
+
+void qdr_forward_message_CT(qdr_core_t *core, qdr_forwarder_t *forwarder, 
qd_message_t *msg, qdr_delivery_t *in_delivery)
+{
+}
+
+
+void qdr_forward_attach_CT(qdr_core_t *core, qdr_forwarder_t *forwarder, 
qdr_link_t *in_link)
+{
+}
+

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fe3157c5/src/router_core/route_tables.c
----------------------------------------------------------------------
diff --git a/src/router_core/route_tables.c b/src/router_core/route_tables.c
index ad0a035..2790b39 100644
--- a/src/router_core/route_tables.c
+++ b/src/router_core/route_tables.c
@@ -32,8 +32,6 @@ static void qdr_unmap_destination_CT (qdr_core_t *core, 
qdr_action_t *action, bo
 static void qdr_subscribe_CT         (qdr_core_t *core, qdr_action_t *action, 
bool discard);
 static void qdr_unsubscribe_CT       (qdr_core_t *core, qdr_action_t *action, 
bool discard);
 
-static qd_address_semantics_t router_addr_semantics = QD_FANOUT_SINGLE | 
QD_BIAS_CLOSEST | QD_CONGESTION_DROP | QD_DROP_FOR_SLOW_CONSUMERS | 
QD_BYPASS_VALID_ORIGINS;
-
 
 
//==================================================================================
 // Interface Functions
@@ -175,11 +173,11 @@ void qdr_route_table_setup_CT(qdr_core_t *core)
     DEQ_INIT(core->routers);
     core->addr_hash = qd_hash(10, 32, 0);
 
-    core->hello_addr      = qdr_add_local_address_CT(core, 'L', "qdhello",     
QD_SEMANTICS_ROUTER_CONTROL);
-    core->router_addr_L   = qdr_add_local_address_CT(core, 'L', "qdrouter",    
QD_SEMANTICS_ROUTER_CONTROL);
-    core->routerma_addr_L = qdr_add_local_address_CT(core, 'L', "qdrouter.ma", 
QD_SEMANTICS_DEFAULT);
-    core->router_addr_T   = qdr_add_local_address_CT(core, 'T', "qdrouter",    
QD_SEMANTICS_ROUTER_CONTROL);
-    core->routerma_addr_T = qdr_add_local_address_CT(core, 'T', "qdrouter.ma", 
QD_SEMANTICS_DEFAULT);
+    core->hello_addr      = qdr_add_local_address_CT(core, 'L', "qdhello",     
QD_SEMANTICS_MULTICAST_FLOOD);
+    core->router_addr_L   = qdr_add_local_address_CT(core, 'L', "qdrouter",    
QD_SEMANTICS_MULTICAST_FLOOD);
+    core->routerma_addr_L = qdr_add_local_address_CT(core, 'L', "qdrouter.ma", 
QD_SEMANTICS_MULTICAST_ONCE);
+    core->router_addr_T   = qdr_add_local_address_CT(core, 'T', "qdrouter",    
QD_SEMANTICS_MULTICAST_FLOOD);
+    core->routerma_addr_T = qdr_add_local_address_CT(core, 'T', "qdrouter.ma", 
QD_SEMANTICS_MULTICAST_ONCE);
 
     core->neighbor_free_mask = qd_bitmask(1);
 
@@ -235,7 +233,7 @@ static void qdr_add_router_CT(qdr_core_t *core, 
qdr_action_t *action, bool disca
         // This record will be found whenever a "foreign" topological address 
to this
         // remote router is looked up.
         //
-        addr = qdr_address(router_addr_semantics);
+        addr = qdr_address_CT(core, QD_SEMANTICS_ANYCAST_CLOSEST);
         qd_hash_insert(core->addr_hash, iter, addr, &addr->hash_handle);
         DEQ_INSERT_TAIL(core->addrs, addr);
 
@@ -488,7 +486,7 @@ static void qdr_map_destination_CT(qdr_core_t *core, 
qdr_action_t *action, bool
 
         qd_hash_retrieve(core->addr_hash, iter, (void**) &addr);
         if (!addr) {
-            addr = qdr_address(0); // FIXME - Semantics
+            addr = qdr_address_CT(core, 0); // FIXME - Semantics
             qd_hash_insert(core->addr_hash, iter, addr, &addr->hash_handle);
             DEQ_ITEM_INIT(addr);
             DEQ_INSERT_TAIL(core->addrs, addr);
@@ -568,7 +566,7 @@ static void qdr_subscribe_CT(qdr_core_t *core, qdr_action_t 
*action, bool discar
 
         qd_hash_retrieve(core->addr_hash, address->iterator, (void**) &addr);
         if (!addr) {
-            addr = qdr_address(action->args.io.semantics);
+            addr = qdr_address_CT(core, action->args.io.semantics);
             qd_hash_insert(core->addr_hash, address->iterator, addr, 
&addr->hash_handle);
             DEQ_ITEM_INIT(addr);
             DEQ_INSERT_TAIL(core->addrs, addr);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fe3157c5/src/router_core/router_core.c
----------------------------------------------------------------------
diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c
index a61e72c..0625784 100644
--- a/src/router_core/router_core.c
+++ b/src/router_core/router_core.c
@@ -156,12 +156,11 @@ void qdr_action_enqueue(qdr_core_t *core, qdr_action_t 
*action)
 }
 
 
-qdr_address_t *qdr_address(qd_address_semantics_t semantics)
+qdr_address_t *qdr_address_CT(qdr_core_t *core, qd_address_semantics_t 
semantics)
 {
     qdr_address_t *addr = new_qdr_address_t();
     ZERO(addr);
-    addr->semantics = semantics;
-    addr->forwarder = qd_router_get_forwarder(semantics);
+    addr->forwarder = qdr_forwarder_CT(core, semantics);
     return addr;
 }
 
@@ -177,7 +176,7 @@ qdr_address_t *qdr_add_local_address_CT(qdr_core_t *core, 
char aclass, const cha
 
     qd_hash_retrieve(core->addr_hash, iter, (void**) &addr);
     if (!addr) {
-        addr = qdr_address(semantics);
+        addr = qdr_address_CT(core, semantics);
         qd_hash_insert(core->addr_hash, iter, addr, &addr->hash_handle);
         DEQ_ITEM_INIT(addr);
         DEQ_INSERT_TAIL(core->addrs, addr);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fe3157c5/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h 
b/src/router_core/router_core_private.h
index c135f11..0fec699 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -25,6 +25,12 @@
 #include <qpid/dispatch/log.h>
 #include <memory.h>
 
+typedef struct qdr_forwarder_t qdr_forwarder_t;
+
+qdr_forwarder_t *qdr_forwarder_CT(qdr_core_t *core, qd_address_semantics_t 
semantics);
+void qdr_forward_message_CT(qdr_core_t *core, qdr_forwarder_t *forwarder, 
qd_message_t *msg, qdr_delivery_t *in_delivery);
+void qdr_forward_attach_CT(qdr_core_t *core, qdr_forwarder_t *forwarder, 
qdr_link_t *in_link);
+
 /**
  * qdr_field_t - This type is used to pass variable-length fields (strings, 
etc.) into
  *               and out of the router-core thread.
@@ -233,11 +239,10 @@ struct qdr_address_t {
     qdr_link_ref_list_t      inlinks;       ///< Locally-Connected Producers
     qdr_router_ref_list_t    rnodes;        ///< Remotely-Connected Consumers
     qd_hash_handle_t        *hash_handle;   ///< Linkage back to the hash 
table entry
-    qd_address_semantics_t   semantics;
+    qdr_forwarder_t         *forwarder;
     bool                     toggle;
     bool                     waypoint;
     bool                     block_deletion;
-    qd_router_forwarder_t   *forwarder;
 
     /**@name Statistics */
     ///@{
@@ -252,7 +257,7 @@ struct qdr_address_t {
 ALLOC_DECLARE(qdr_address_t);
 DEQ_DECLARE(qdr_address_t, qdr_address_list_t);
 
-qdr_address_t *qdr_address(qd_address_semantics_t semantics);
+qdr_address_t *qdr_address_CT(qdr_core_t *core, qd_address_semantics_t 
semantics);
 qdr_address_t *qdr_add_local_address_CT(qdr_core_t *core, char aclass, const 
char *addr, qd_address_semantics_t semantics);
 
 void qdr_add_link_ref(qdr_link_ref_list_t *ref_list, qdr_link_t *link);
@@ -385,6 +390,8 @@ struct qdr_core_t {
     qdr_node_t          **routers_by_mask_bit;
     qdr_link_t          **control_links_by_mask_bit;
     qdr_link_t          **data_links_by_mask_bit;
+
+    qdr_forwarder_t      *forwarders[QD_SEMANTICS_LINK_BALANCED + 1];
 };
 
 typedef enum {
@@ -396,6 +403,7 @@ typedef enum {
 void *router_core_thread(void *arg);
 void  qdr_route_table_setup_CT(qdr_core_t *core);
 void  qdr_agent_setup_CT(qdr_core_t *core);
+void  qdr_forwarder_setup_CT(qdr_core_t *core);
 qdr_action_t *qdr_action(qdr_action_handler_t action_handler, const char 
*label);
 void qdr_action_enqueue(qdr_core_t *core, qdr_action_t *action);
 void qdr_agent_enqueue_response_CT(qdr_core_t *core, qdr_query_t *query);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fe3157c5/src/router_core/router_core_thread.c
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_thread.c 
b/src/router_core/router_core_thread.c
index ce4b6a8..28e979f 100644
--- a/src/router_core/router_core_thread.c
+++ b/src/router_core/router_core_thread.c
@@ -38,6 +38,7 @@ void *router_core_thread(void *arg)
 
     qdr_route_table_setup_CT(core);
     qdr_agent_setup_CT(core);
+    qdr_forwarder_setup_CT(core);
 
     qd_log(core->log, QD_LOG_INFO, "Router Core thread running. %s/%s", 
core->router_area, core->router_id);
     while (core->running) {

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/fe3157c5/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index 33f4eba..cde02ef 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -59,7 +59,7 @@ qd_address_t* qd_address(qd_address_semantics_t semantics)
     DEQ_INIT(addr->rlinks);
     DEQ_INIT(addr->rnodes);
     addr->semantics = semantics;
-    addr->forwarder = qd_router_get_forwarder(semantics);
+    addr->forwarder = 0; //qd_router_get_forwarder(semantics);
     return addr;
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to