Repository: qpid-dispatch
Updated Branches:
  refs/heads/master a78e15815 -> 7699d55b0


DISPATCH-1181: add hint about treatment to MAU and use that on receipt if there 
is no locally defined treatment


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/7699d55b
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/7699d55b
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/7699d55b

Branch: refs/heads/master
Commit: 7699d55b057e5493fef37e3b3454feb7b35edfc3
Parents: a78e158
Author: Gordon Sim <g...@redhat.com>
Authored: Fri Nov 9 22:43:10 2018 +0000
Committer: Gordon Sim <g...@redhat.com>
Committed: Mon Nov 12 10:03:46 2018 +0000

----------------------------------------------------------------------
 include/qpid/dispatch/router_core.h            |  4 +-
 python/qpid_dispatch_internal/router/data.py   | 10 ++-
 python/qpid_dispatch_internal/router/engine.py |  4 +-
 python/qpid_dispatch_internal/router/mobile.py | 15 +++-
 python/qpid_dispatch_internal/router/node.py   |  4 +-
 src/router_core/connections.c                  |  7 +-
 src/router_core/exchange_bindings.c            |  3 +-
 src/router_core/route_tables.c                 | 34 +++++++--
 src/router_core/router_core.c                  |  4 +-
 src/router_core/router_core_private.h          |  5 +-
 src/router_pynode.c                            | 10 ++-
 tests/system_tests_two_routers.py              | 84 +++++++++++++++++++++
 12 files changed, 158 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7699d55b/include/qpid/dispatch/router_core.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/router_core.h 
b/include/qpid/dispatch/router_core.h
index 4d61275..b8d48ed 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -85,10 +85,10 @@ void qdr_core_set_next_hop(qdr_core_t *core, int 
router_maskbit, int nh_router_m
 void qdr_core_remove_next_hop(qdr_core_t *core, int router_maskbit);
 void qdr_core_set_cost(qdr_core_t *core, int router_maskbit, int cost);
 void qdr_core_set_valid_origins(qdr_core_t *core, int router_maskbit, 
qd_bitmask_t *routers);
-void qdr_core_map_destination(qdr_core_t *core, int router_maskbit, const char 
*address_hash);
+void qdr_core_map_destination(qdr_core_t *core, int router_maskbit, const char 
*address_hash, int treatment_hint);
 void qdr_core_unmap_destination(qdr_core_t *core, int router_maskbit, const 
char *address_hash);
 
-typedef void (*qdr_mobile_added_t)   (void *context, const char *address_hash);
+typedef void (*qdr_mobile_added_t)   (void *context, const char *address_hash, 
qd_address_treatment_t treatment);
 typedef void (*qdr_mobile_removed_t) (void *context, const char *address_hash);
 typedef void (*qdr_link_lost_t)      (void *context, int link_maskbit);
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7699d55b/python/qpid_dispatch_internal/router/data.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/router/data.py 
b/python/qpid_dispatch_internal/router/data.py
index 6c1db7f..810f98d 100644
--- a/python/qpid_dispatch_internal/router/data.py
+++ b/python/qpid_dispatch_internal/router/data.py
@@ -250,7 +250,7 @@ class MessageLSR(object):
 class MessageMAU(object):
     """
     """
-    def __init__(self, body, _id=None, _seq=None, _add_list=None, 
_del_list=None, _exist_list=None):
+    def __init__(self, body, _id=None, _seq=None, _add_list=None, 
_del_list=None, _exist_list=None, _hints=None):
         if body:
             self.id = getMandatory(body, 'id', PY_TEXT_TYPE)
             self.version = getOptional(body, 'pv', 0, PY_LONG_TYPE)
@@ -259,6 +259,7 @@ class MessageMAU(object):
             self.add_list = getOptional(body, 'add', None, list)
             self.del_list = getOptional(body, 'del', None, list)
             self.exist_list = getOptional(body, 'exist', None, list)
+            self.hints = getOptional(body, 'hints', None, list)
         else:
             self.id = _id
             self.version = ProtocolVersion
@@ -267,6 +268,7 @@ class MessageMAU(object):
             self.add_list = _add_list
             self.del_list = _del_list
             self.exist_list = _exist_list
+            self.hints = _hints
 
     def get_opcode(self):
         return 'MAU'
@@ -278,8 +280,9 @@ class MessageMAU(object):
         if self.add_list != None:   _add   = ' add=%r'   % self.add_list
         if self.del_list != None:   _del   = ' del=%r'   % self.del_list
         if self.exist_list != None: _exist = ' exist=%r' % self.exist_list
-        return "MAU(id=%s pv=%d area=%s mobile_seq=%d%s%s%s)" % \
-                (self.id, self.version, self.area, self.mobile_seq, _add, 
_del, _exist)
+        if self.hints != None: _hints = ' hints=%r' % self.hints
+        return "MAU(id=%s pv=%d area=%s mobile_seq=%d%s%s%s%s)" % \
+                (self.id, self.version, self.area, self.mobile_seq, _add, 
_del, _exist, _hints)
 
     def to_dict(self):
         body = {'id'         : self.id,
@@ -289,6 +292,7 @@ class MessageMAU(object):
         if self.add_list != None:   body['add']   = self.add_list
         if self.del_list != None:   body['del']   = self.del_list
         if self.exist_list != None: body['exist'] = self.exist_list
+        if self.hints != None: body['hints'] = self.hints
         return body
 
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7699d55b/python/qpid_dispatch_internal/router/engine.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/router/engine.py 
b/python/qpid_dispatch_internal/router/engine.py
index 13d7a78..753d743 100644
--- a/python/qpid_dispatch_internal/router/engine.py
+++ b/python/qpid_dispatch_internal/router/engine.py
@@ -98,12 +98,12 @@ class RouterEngine(object):
                 raise ValueError("No router configuration found")
         return self._config
 
-    def addressAdded(self, addr):
+    def addressAdded(self, addr, treatment):
         """
         """
         try:
             if addr[0] in 'MCDEFH':
-                self.mobile_address_engine.add_local_address(addr)
+                self.mobile_address_engine.add_local_address(addr, treatment)
         except Exception:
             self.log_ma(LOG_ERROR, "Exception in new-address processing\n%s" % 
format_exc(LOG_STACK_LIMIT))
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7699d55b/python/qpid_dispatch_internal/router/mobile.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/router/mobile.py 
b/python/qpid_dispatch_internal/router/mobile.py
index 6e768c2..b4c73f9 100644
--- a/python/qpid_dispatch_internal/router/mobile.py
+++ b/python/qpid_dispatch_internal/router/mobile.py
@@ -43,6 +43,7 @@ class MobileAddressEngine(object):
         self.added_addrs   = []
         self.deleted_addrs = []
         self.sent_deltas   = {}
+        self.treatments    = {}
 
 
     def tick(self, now):
@@ -52,7 +53,8 @@ class MobileAddressEngine(object):
         ##
         if len(self.added_addrs) > 0 or len(self.deleted_addrs) > 0:
             self.mobile_seq += 1
-            msg = MessageMAU(None, self.id, self.mobile_seq, self.added_addrs, 
self.deleted_addrs)
+            hints = [self.treatments[a] for a in self.added_addrs]
+            msg = MessageMAU(None, self.id, self.mobile_seq, self.added_addrs, 
self.deleted_addrs, _hints=hints)
 
             self.sent_deltas[self.mobile_seq] = msg
             if len(self.sent_deltas) > MAX_KEPT_DELTAS:
@@ -68,9 +70,10 @@ class MobileAddressEngine(object):
         return self.mobile_seq
 
 
-    def add_local_address(self, addr):
+    def add_local_address(self, addr, treatment):
         """
         """
+        self.treatments[addr] = treatment
         if self.local_addrs.count(addr) == 0:
             if self.added_addrs.count(addr) == 0:
                 self.added_addrs.append(addr)
@@ -82,6 +85,7 @@ class MobileAddressEngine(object):
     def del_local_address(self, addr):
         """
         """
+        del self.treatments[addr]
         if self.local_addrs.count(addr) > 0:
             if self.deleted_addrs.count(addr) == 0:
                 self.deleted_addrs.append(addr)
@@ -118,8 +122,13 @@ class MobileAddressEngine(object):
                 ## This message represents the next expected sequence, 
incorporate the deltas
                 ##
                 node.mobile_address_sequence += 1
+                treatments = msg.hints or []
                 for a in msg.add_list:
-                    node.map_address(a)
+                    if len(treatments):
+                        treatment = treatments.pop(0)
+                    else:
+                        treatment = -1
+                    node.map_address(a, treatment)
                 for a in msg.del_list:
                     node.unmap_address(a)
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7699d55b/python/qpid_dispatch_internal/router/node.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/router/node.py 
b/python/qpid_dispatch_internal/router/node.py
index 8d8841e..16930fb 100644
--- a/python/qpid_dispatch_internal/router/node.py
+++ b/python/qpid_dispatch_internal/router/node.py
@@ -541,9 +541,9 @@ class RouterNode(object):
         return False
 
 
-    def map_address(self, addr):
+    def map_address(self, addr, treatment):
         self.mobile_addresses.append(addr)
-        self.adapter.map_destination(addr, self.maskbit)
+        self.adapter.map_destination(addr, treatment, self.maskbit)
         self.log(LOG_DEBUG, "Remote destination %s mapped to router %s" % 
(self._logify(addr), self.id))
 
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7699d55b/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 1ca6324..db271f4 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -1015,12 +1015,17 @@ qd_address_treatment_t 
qdr_treatment_for_address_CT(qdr_core_t *core, qdr_connec
 
 qd_address_treatment_t qdr_treatment_for_address_hash_CT(qdr_core_t *core, 
qd_iterator_t *iter)
 {
+    return qdr_treatment_for_address_hash_with_default_CT(core, iter, 
core->qd->default_treatment);
+}
+
+qd_address_treatment_t 
qdr_treatment_for_address_hash_with_default_CT(qdr_core_t *core, qd_iterator_t 
*iter, qd_address_treatment_t default_treatment)
+{
 #define HASH_STORAGE_SIZE 1000
     char  storage[HASH_STORAGE_SIZE + 1];
     char *copy    = storage;
     bool  on_heap = false;
     int   length  = qd_iterator_length(iter);
-    qd_address_treatment_t trt = core->qd->default_treatment;
+    qd_address_treatment_t trt = default_treatment;
 
     if (length > HASH_STORAGE_SIZE) {
         copy    = (char*) malloc(length + 1);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7699d55b/src/router_core/exchange_bindings.c
----------------------------------------------------------------------
diff --git a/src/router_core/exchange_bindings.c 
b/src/router_core/exchange_bindings.c
index 43a5ca4..15d369e 100644
--- a/src/router_core/exchange_bindings.c
+++ b/src/router_core/exchange_bindings.c
@@ -927,7 +927,8 @@ static qdr_exchange_t *qdr_exchange(qdr_core_t *core,
         }
 
         qdr_post_mobile_added_CT(core,
-                                 (const char*) 
qd_hash_key_by_handle(ex->qdr_addr->hash_handle));
+                                 (const char*) 
qd_hash_key_by_handle(ex->qdr_addr->hash_handle),
+                                 ex->qdr_addr->treatment);
     }
 
     return ex;

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7699d55b/src/router_core/route_tables.c
----------------------------------------------------------------------
diff --git a/src/router_core/route_tables.c b/src/router_core/route_tables.c
index 84834fe..28b1414 100644
--- a/src/router_core/route_tables.c
+++ b/src/router_core/route_tables.c
@@ -108,11 +108,12 @@ void qdr_core_set_valid_origins(qdr_core_t *core, int 
router_maskbit, qd_bitmask
 }
 
 
-void qdr_core_map_destination(qdr_core_t *core, int router_maskbit, const char 
*address_hash)
+void qdr_core_map_destination(qdr_core_t *core, int router_maskbit, const char 
*address_hash, int treatment_hint)
 {
     qdr_action_t *action = qdr_action(qdr_map_destination_CT, 
"map_destination");
     action->args.route_table.router_maskbit = router_maskbit;
     action->args.route_table.address        = qdr_field(address_hash);
+    action->args.route_table.treatment_hint = treatment_hint;
     qdr_action_enqueue(core, action);
 }
 
@@ -557,11 +558,30 @@ static void qdr_set_valid_origins_CT(qdr_core_t *core, 
qdr_action_t *action, boo
         qd_bitmask_free(valid_origins);
 }
 
+static qd_address_treatment_t default_treatment(qdr_core_t *core, int hint) {
+    switch (hint) {
+    case QD_TREATMENT_MULTICAST_FLOOD:
+        return QD_TREATMENT_MULTICAST_FLOOD;
+    case QD_TREATMENT_MULTICAST_ONCE:
+        return QD_TREATMENT_MULTICAST_ONCE;
+    case QD_TREATMENT_ANYCAST_CLOSEST:
+        return QD_TREATMENT_ANYCAST_CLOSEST;
+    case QD_TREATMENT_ANYCAST_BALANCED:
+        return QD_TREATMENT_ANYCAST_BALANCED;
+    case QD_TREATMENT_LINK_BALANCED:
+        return QD_TREATMENT_LINK_BALANCED;
+    case QD_TREATMENT_UNAVAILABLE:
+        return QD_TREATMENT_UNAVAILABLE;
+    default:
+        return core->qd->default_treatment == QD_TREATMENT_UNAVAILABLE ? 
QD_TREATMENT_ANYCAST_BALANCED : core->qd->default_treatment;
+    }
+}
 
 static void qdr_map_destination_CT(qdr_core_t *core, qdr_action_t *action, 
bool discard)
 {
     int          router_maskbit = action->args.route_table.router_maskbit;
     qdr_field_t *address        = action->args.route_table.address;
+    int          treatment_hint = action->args.route_table.treatment_hint;
 
     if (discard) {
         qdr_field_free(address);
@@ -584,8 +604,11 @@ static void qdr_map_destination_CT(qdr_core_t *core, 
qdr_action_t *action, bool
 
         qd_hash_retrieve(core->addr_hash, iter, (void**) &addr);
         if (!addr) {
-            addr = qdr_address_CT(core, 
qdr_treatment_for_address_hash_CT(core, iter));
-            if (!addr) break;
+            addr = qdr_address_CT(core, 
qdr_treatment_for_address_hash_with_default_CT(core, iter, 
default_treatment(core, treatment_hint)));
+            if (!addr) {
+                qd_log(core->log, QD_LOG_CRITICAL, "map_destination: ignored");
+                break;
+            }
             qd_hash_insert(core->addr_hash, iter, addr, &addr->hash_handle);
             DEQ_ITEM_INIT(addr);
             DEQ_INSERT_TAIL(core->addrs, addr);
@@ -730,7 +753,7 @@ static void qdr_do_mobile_added(qdr_core_t *core, 
qdr_general_work_t *work)
 {
     char *address_hash = qdr_field_copy(work->field);
     if (address_hash) {
-        core->rt_mobile_added(core->rt_context, address_hash);
+        core->rt_mobile_added(core->rt_context, address_hash, work->treatment);
         free(address_hash);
     }
 
@@ -756,10 +779,11 @@ static void qdr_do_link_lost(qdr_core_t *core, 
qdr_general_work_t *work)
 }
 
 
-void qdr_post_mobile_added_CT(qdr_core_t *core, const char *address_hash)
+void qdr_post_mobile_added_CT(qdr_core_t *core, const char *address_hash, 
qd_address_treatment_t treatment)
 {
     qdr_general_work_t *work = qdr_general_work(qdr_do_mobile_added);
     work->field = qdr_field(address_hash);
+    work->treatment = treatment;
     qdr_post_general_work_CT(core, work);
 }
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7699d55b/src/router_core/router_core.c
----------------------------------------------------------------------
diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c
index af7a736..6b7e2b8 100644
--- a/src/router_core/router_core.c
+++ b/src/router_core/router_core.c
@@ -431,7 +431,7 @@ void qdr_core_bind_address_link_CT(qdr_core_t *core, 
qdr_address_t *addr, qdr_li
         if (DEQ_SIZE(addr->rlinks) == 1) {
             const char *key = (const char*) 
qd_hash_key_by_handle(addr->hash_handle);
             if (key && (*key == QD_ITER_HASH_PREFIX_EDGE_SUMMARY || *key == 
QD_ITER_HASH_PREFIX_MOBILE))
-                qdr_post_mobile_added_CT(core, key);
+                qdr_post_mobile_added_CT(core, key, addr->treatment);
             qdr_addr_start_inlinks_CT(core, addr);
             qdrc_event_addr_raise(core, QDRC_EVENT_ADDR_BECAME_LOCAL_DEST, 
addr);
         } else if (DEQ_SIZE(addr->rlinks) == 2 && 
qd_bitmask_cardinality(addr->rnodes) == 0)
@@ -476,7 +476,7 @@ void qdr_core_bind_address_conn_CT(qdr_core_t *core, 
qdr_address_t *addr, qdr_co
     qdr_add_connection_ref(&addr->conns, conn);
     if (DEQ_SIZE(addr->conns) == 1) {
         const char *key = (const char*) 
qd_hash_key_by_handle(addr->hash_handle);
-        qdr_post_mobile_added_CT(core, key);
+        qdr_post_mobile_added_CT(core, key, addr->treatment);
         qdrc_event_addr_raise(core, QDRC_EVENT_ADDR_BECAME_LOCAL_DEST, addr);
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7699d55b/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h 
b/src/router_core/router_core_private.h
index ec51fd7..19a1d1e 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -94,6 +94,7 @@ struct qdr_action_t {
             int           router_maskbit;
             int           nh_router_maskbit;
             int           cost;
+            int           treatment_hint;
             qd_bitmask_t *router_set;
             qdr_field_t  *address;
         } route_table;
@@ -193,6 +194,7 @@ struct qdr_general_work_t {
     void                       *on_message_context;
     qd_message_t               *msg;
     uint64_t                    in_conn_id;
+    int                         treatment;
 };
 
 ALLOC_DECLARE(qdr_general_work_t);
@@ -884,7 +886,7 @@ qdr_delivery_t *qdr_delivery_next_peer_CT(qdr_delivery_t 
*dlv);
 
 void qdr_agent_enqueue_response_CT(qdr_core_t *core, qdr_query_t *query);
 
-void qdr_post_mobile_added_CT(qdr_core_t *core, const char *address_hash);
+void qdr_post_mobile_added_CT(qdr_core_t *core, const char *address_hash, 
qd_address_treatment_t treatment);
 void qdr_post_mobile_removed_CT(qdr_core_t *core, const char *address_hash);
 void qdr_post_link_lost_CT(qdr_core_t *core, int link_maskbit);
 
@@ -897,6 +899,7 @@ void qdr_connection_free(qdr_connection_t *conn);
 void qdr_connection_activate_CT(qdr_core_t *core, qdr_connection_t *conn);
 qd_address_treatment_t qdr_treatment_for_address_CT(qdr_core_t *core, 
qdr_connection_t *conn, qd_iterator_t *iter, int *in_phase, int *out_phase, int 
*priority);
 qd_address_treatment_t qdr_treatment_for_address_hash_CT(qdr_core_t *core, 
qd_iterator_t *iter);
+qd_address_treatment_t 
qdr_treatment_for_address_hash_with_default_CT(qdr_core_t *core, qd_iterator_t 
*iter, qd_address_treatment_t default_treatment);
 qdr_edge_t *qdr_edge(qdr_core_t *);
 void qdr_edge_free(qdr_edge_t *);
 void qdr_edge_connection_opened(qdr_edge_t *edge, qdr_connection_t *conn);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7699d55b/src/router_pynode.c
----------------------------------------------------------------------
diff --git a/src/router_pynode.c b/src/router_pynode.c
index 1d64cfc..1d0c8d5 100644
--- a/src/router_pynode.c
+++ b/src/router_pynode.c
@@ -242,9 +242,10 @@ static PyObject* qd_map_destination(PyObject *self, 
PyObject *args)
     RouterAdapter *adapter = (RouterAdapter*) self;
     qd_router_t   *router  = adapter->router;
     const char    *addr_string;
+    int            treatment;
     int            maskbit;
 
-    if (!PyArg_ParseTuple(args, "si", &addr_string, &maskbit))
+    if (!PyArg_ParseTuple(args, "sii", &addr_string, &treatment, &maskbit))
         return 0;
 
     if (maskbit >= qd_bitmask_width() || maskbit < 0) {
@@ -252,7 +253,7 @@ static PyObject* qd_map_destination(PyObject *self, 
PyObject *args)
         return 0;
     }
 
-    qdr_core_map_destination(router->router_core, maskbit, addr_string);
+    qdr_core_map_destination(router->router_core, maskbit, addr_string, 
treatment);
 
     Py_INCREF(Py_None);
     return Py_None;
@@ -316,7 +317,7 @@ static PyTypeObject RouterAdapterType = {
 };
 
 
-static void qd_router_mobile_added(void *context, const char *address_hash)
+static void qd_router_mobile_added(void *context, const char *address_hash, 
qd_address_treatment_t treatment)
 {
     qd_router_t *router = (qd_router_t*) context;
     PyObject    *pArgs;
@@ -324,8 +325,9 @@ static void qd_router_mobile_added(void *context, const 
char *address_hash)
 
     if (pyAdded && router->router_mode == QD_ROUTER_MODE_INTERIOR) {
         qd_python_lock_state_t lock_state = qd_python_lock();
-        pArgs = PyTuple_New(1);
+        pArgs = PyTuple_New(2);
         PyTuple_SetItem(pArgs, 0, PyUnicode_FromString(address_hash));
+        PyTuple_SetItem(pArgs, 1, PyLong_FromLong((long) treatment));
         pValue = PyObject_CallObject(pyAdded, pArgs);
         qd_error_py();
         Py_DECREF(pArgs);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7699d55b/tests/system_tests_two_routers.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_two_routers.py 
b/tests/system_tests_two_routers.py
index 615f059..778b40f 100644
--- a/tests/system_tests_two_routers.py
+++ b/tests/system_tests_two_routers.py
@@ -1382,6 +1382,90 @@ class TwoRouterConnection(TestCase):
 
         self.assertTrue(self.success)
 
+class PropagationTest(TestCase):
+
+    inter_router_port = None
+
+    @classmethod
+    def setUpClass(cls):
+        """Start a router and a messenger"""
+        super(PropagationTest, cls).setUpClass()
+
+        def router(name, extra_config):
+
+            config = [
+                ('router', {'mode': 'interior', 'id': 'QDR.%s'%name}),
+
+                ('listener', {'port': cls.tester.get_port()}),
+
+            ] + extra_config
+
+            config = Qdrouterd.Config(config)
+
+            cls.routers.append(cls.tester.qdrouterd(name, config, wait=True))
+
+        cls.routers = []
+
+        inter_router_port = cls.tester.get_port()
+        router('A', [('listener', {'role': 'inter-router', 'port': 
inter_router_port}), ('address', {'prefix': 'multicast', 'distribution': 
'multicast'})])
+        router('B', [('connector', {'role': 'inter-router', 'port': 
inter_router_port})])
+
+        cls.routers[0].wait_router_connected('QDR.B')
+        cls.routers[1].wait_router_connected('QDR.A')
+
+    def test_propagation_of_locally_undefined_address(self):
+        test = MulticastTestClient(self.routers[0].addresses[0], 
self.routers[1].addresses[0])
+        test.run()
+        self.assertEqual(None, test.error)
+        self.assertEqual(test.received, 2)
+
+class CreateReceiver(MessagingHandler):
+    def __init__(self, connection, address):
+        super(CreateReceiver, self).__init__()
+        self.connection = connection
+        self.address = address
+
+    def on_timer_task(self, event):
+        event.container.create_receiver(self.connection, self.address)
+
+class DelayedSend(MessagingHandler):
+    def __init__(self, connection, address, message):
+        super(DelayedSend, self).__init__()
+        self.connection = connection
+        self.address = address
+        self.message = message
+
+    def on_timer_task(self, event):
+        event.container.create_sender(self.connection, 
self.address).send(self.message)
+
+class MulticastTestClient(MessagingHandler):
+    def __init__(self, router1, router2):
+        super(MulticastTestClient, self).__init__()
+        self.routers = [router1, router2]
+        self.received = 0
+        self.error = None
+
+    def on_start(self, event):
+        self.connections = [event.container.connect(r) for r in self.routers]
+        event.container.create_receiver(self.connections[0], "multicast")
+        # wait for knowledge of receiver1 to propagate to second router
+        event.container.schedule(5, CreateReceiver(self.connections[1], 
"multicast"))
+        event.container.schedule(7, DelayedSend(self.connections[1], 
"multicast", Message(body="testing1,2,3")))
+        self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
+
+    def on_message(self, event):
+        self.received += 1
+        event.connection.close()
+        if self.received == 2:
+            self.timer.cancel()
+
+    def timeout(self):
+        self.error = "Timeout Expired:received=%d" % self.received
+        for c in self.connections:
+            c.close()
+
+    def run(self):
+        Container(self).run()
 
 if __name__ == '__main__':
     unittest.main(main_module())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to