Author: tross
Date: Wed Jan 21 15:22:57 2015
New Revision: 1653556

URL: http://svn.apache.org/r1653556
Log:
DISPATCH-97 - Major refactor and redesign of the Python Router module.
  - Removed the distinction between neighbor and remote routers.
    Now, routers either have a link (neighbor) or a next-hop (remote).
  - Added a startup-sequence field to detect rapid-restart of a router.
  - Request messages are not sent until there is a route to the destination.
  - Mobile-address updates are now broadcast on a separate address that does not
    bypass valid-origin checks.  Only one copy is delivered to each router in 
the
    network.
  - Router logging has been divided into sub-protocols for better monitoring and
    troubleshooting.
  - A 6-node test configuration has been added.

Added:
    qpid/dispatch/trunk/python/qpid_dispatch_internal/router/hello.py
      - copied, changed from r1653545, 
qpid/dispatch/trunk/python/qpid_dispatch_internal/router/neighbor.py
    qpid/dispatch/trunk/tests/config-6/
    qpid/dispatch/trunk/tests/config-6/A.conf
    qpid/dispatch/trunk/tests/config-6/B.conf
    qpid/dispatch/trunk/tests/config-6/C.conf
    qpid/dispatch/trunk/tests/config-6/D.conf
    qpid/dispatch/trunk/tests/config-6/X.conf
    qpid/dispatch/trunk/tests/config-6/Y.conf
    qpid/dispatch/trunk/tests/config-6/topology.txt
      - copied, changed from r1650165, 
qpid/dispatch/trunk/tests/config-3-linear/topology.txt
Removed:
    qpid/dispatch/trunk/python/qpid_dispatch_internal/router/neighbor.py
    qpid/dispatch/trunk/python/qpid_dispatch_internal/router/routing.py
Modified:
    qpid/dispatch/trunk/include/qpid/dispatch/ctools.h
    qpid/dispatch/trunk/python/qpid_dispatch/management/qdrouter.json
    qpid/dispatch/trunk/python/qpid_dispatch_internal/router/data.py
    qpid/dispatch/trunk/python/qpid_dispatch_internal/router/engine.py
    qpid/dispatch/trunk/python/qpid_dispatch_internal/router/link.py
    qpid/dispatch/trunk/python/qpid_dispatch_internal/router/mobile.py
    qpid/dispatch/trunk/python/qpid_dispatch_internal/router/node.py
    qpid/dispatch/trunk/python/qpid_dispatch_internal/router/path.py
    qpid/dispatch/trunk/src/python_embedded.c
    qpid/dispatch/trunk/src/router_node.c
    qpid/dispatch/trunk/src/router_private.h
    qpid/dispatch/trunk/src/router_pynode.c
    qpid/dispatch/trunk/tests/config-2/A.conf
    qpid/dispatch/trunk/tests/config-2/B.conf
    qpid/dispatch/trunk/tests/config-3-linear/A.conf
    qpid/dispatch/trunk/tests/config-3-linear/B.conf
    qpid/dispatch/trunk/tests/config-3-linear/C.conf
    qpid/dispatch/trunk/tests/router_engine_test.py

Modified: qpid/dispatch/trunk/include/qpid/dispatch/ctools.h
URL: 
http://svn.apache.org/viewvc/qpid/dispatch/trunk/include/qpid/dispatch/ctools.h?rev=1653556&r1=1653555&r2=1653556&view=diff
==============================================================================
--- qpid/dispatch/trunk/include/qpid/dispatch/ctools.h (original)
+++ qpid/dispatch/trunk/include/qpid/dispatch/ctools.h Wed Jan 21 15:22:57 2015
@@ -54,7 +54,7 @@
  *@pre ptr points to first element of deq
  *@post ptr points to first element of deq that passes test, or 0. Test should 
involve ptr.
  */
-#define DEQ_FIND(ptr, test) while((ptr) && !(test)) ptr = DEQ_NEXT(ptr);
+#define DEQ_FIND(ptr,test) while((ptr) && !(test)) ptr = DEQ_NEXT(ptr);
 
 #define DEQ_INSERT_HEAD(d,i)      \
 do {                              \

Modified: qpid/dispatch/trunk/python/qpid_dispatch/management/qdrouter.json
URL: 
http://svn.apache.org/viewvc/qpid/dispatch/trunk/python/qpid_dispatch/management/qdrouter.json?rev=1653556&r1=1653555&r2=1653556&view=diff
==============================================================================
--- qpid/dispatch/trunk/python/qpid_dispatch/management/qdrouter.json (original)
+++ qpid/dispatch/trunk/python/qpid_dispatch/management/qdrouter.json Wed Jan 
21 15:22:57 2015
@@ -260,6 +260,9 @@
                 "module": {
                     "type":[
                         "ROUTER",
+                        "ROUTER_HELLO",
+                        "ROUTER_LS",
+                        "ROUTER_MA",
                         "MESSAGE",
                         "SERVER",
                         "AGENT",

Modified: qpid/dispatch/trunk/python/qpid_dispatch_internal/router/data.py
URL: 
http://svn.apache.org/viewvc/qpid/dispatch/trunk/python/qpid_dispatch_internal/router/data.py?rev=1653556&r1=1653555&r2=1653556&view=diff
==============================================================================
--- qpid/dispatch/trunk/python/qpid_dispatch_internal/router/data.py (original)
+++ qpid/dispatch/trunk/python/qpid_dispatch_internal/router/data.py Wed Jan 21 
15:22:57 2015
@@ -46,16 +46,16 @@ class LinkState(object):
     The link-state of a single router.  The link state consists of a list of 
neighbor routers reachable from
     the reporting router.  The link-state-sequence number is incremented each 
time the link state changes.
     """
-    def __init__(self, body, _id=None, _area=None, _ls_seq=None, _peers=None):
+    def __init__(self, body, _id=None, _ls_seq=None, _peers=None):
         self.last_seen = 0
         if body:
             self.id = getMandatory(body, 'id', str)
-            self.area = getMandatory(body, 'area', str)
+            self.area = '0'
             self.ls_seq = getMandatory(body, 'ls_seq', long)
             self.peers = getMandatory(body, 'peers', list)
         else:
             self.id = _id
-            self.area = _area
+            self.area = '0'
             self.ls_seq = long(_ls_seq)
             self.peers = _peers
 
@@ -87,6 +87,9 @@ class LinkState(object):
     def has_peers(self):
         return len(self.peers) > 0
 
+    def is_peer(self, _id):
+        return _id in self.peers
+
     def bump_sequence(self):
         self.ls_seq += 1
 
@@ -98,15 +101,15 @@ class MessageHELLO(object):
     This message is used by directly connected routers to determine with whom 
they have
     bidirectional connectivity.
     """
-    def __init__(self, body, _id=None, _area=None, _seen_peers=None, 
_instance=long(0)):
+    def __init__(self, body, _id=None, _seen_peers=None, _instance=long(0)):
         if body:
             self.id = getMandatory(body, 'id', str)
-            self.area = getMandatory(body, 'area', str)
+            self.area = '0'
             self.seen_peers = getMandatory(body, 'seen', list)
             self.instance = getOptional(body, 'instance', 0, long)
         else:
             self.id   = _id
-            self.area = _area
+            self.area = '0'
             self.seen_peers = _seen_peers
             self.instance = _instance
 
@@ -133,16 +136,16 @@ class MessageRA(object):
     This message is sent periodically to indicate the originating router's 
sequence numbers
     for link-state and mobile-address-state.
     """
-    def __init__(self, body, _id=None, _area=None, _ls_seq=None, 
_mobile_seq=None, _instance=long(0)):
+    def __init__(self, body, _id=None, _ls_seq=None, _mobile_seq=None, 
_instance=long(0)):
         if body:
             self.id = getMandatory(body, 'id', str)
-            self.area = getMandatory(body, 'area', str)
+            self.area = '0'
             self.ls_seq = getMandatory(body, 'ls_seq', long)
             self.mobile_seq = getMandatory(body, 'mobile_seq', long)
             self.instance = getOptional(body, 'instance', 0, long)
         else:
             self.id = _id
-            self.area = _area
+            self.area = '0'
             self.ls_seq = long(_ls_seq)
             self.mobile_seq = long(_mobile_seq)
             self.instance = _instance
@@ -165,16 +168,16 @@ class MessageRA(object):
 class MessageLSU(object):
     """
     """
-    def __init__(self, body, _id=None, _area=None, _ls_seq=None, _ls=None, 
_instance=long(0)):
+    def __init__(self, body, _id=None, _ls_seq=None, _ls=None, 
_instance=long(0)):
         if body:
             self.id = getMandatory(body, 'id', str)
-            self.area = getMandatory(body, 'area', str)
+            self.area = '0'
             self.ls_seq = getMandatory(body, 'ls_seq', long)
             self.ls = LinkState(getMandatory(body, 'ls', dict))
             self.instance = getOptional(body, 'instance', 0, long)
         else:
             self.id = _id
-            self.area = _area
+            self.area = '0'
             self.ls_seq = long(_ls_seq)
             self.ls = _ls
             self.instance = _instance
@@ -197,13 +200,13 @@ class MessageLSU(object):
 class MessageLSR(object):
     """
     """
-    def __init__(self, body, _id=None, _area=None):
+    def __init__(self, body, _id=None):
         if body:
             self.id = getMandatory(body, 'id', str)
-            self.area = getMandatory(body, 'area', str)
+            self.area = '0'
         else:
             self.id = _id
-            self.area = _area
+            self.area = '0'
 
     def get_opcode(self):
         return 'LSR'
@@ -219,17 +222,17 @@ class MessageLSR(object):
 class MessageMAU(object):
     """
     """
-    def __init__(self, body, _id=None, _area=None, _seq=None, _add_list=None, 
_del_list=None, _exist_list=None):
+    def __init__(self, body, _id=None, _seq=None, _add_list=None, 
_del_list=None, _exist_list=None):
         if body:
             self.id = getMandatory(body, 'id', str)
-            self.area = getMandatory(body, 'area', str)
+            self.area = '0'
             self.mobile_seq = getMandatory(body, 'mobile_seq', long)
             self.add_list = getOptional(body, 'add', None, list)
             self.del_list = getOptional(body, 'del', None, list)
             self.exist_list = getOptional(body, 'exist', None, list)
         else:
             self.id = _id
-            self.area = _area
+            self.area = '0'
             self.mobile_seq = long(_seq)
             self.add_list = _add_list
             self.del_list = _del_list
@@ -261,14 +264,14 @@ class MessageMAU(object):
 class MessageMAR(object):
     """
     """
-    def __init__(self, body, _id=None, _area=None, _have_seq=None):
+    def __init__(self, body, _id=None, _have_seq=None):
         if body:
             self.id = getMandatory(body, 'id', str)
-            self.area = getMandatory(body, 'area', str)
+            self.area = '0'
             self.have_seq = getMandatory(body, 'have_seq', long)
         else:
             self.id = _id
-            self.area = _area
+            self.area = '0'
             self.have_seq = long(_have_seq)
 
     def get_opcode(self):

Modified: qpid/dispatch/trunk/python/qpid_dispatch_internal/router/engine.py
URL: 
http://svn.apache.org/viewvc/qpid/dispatch/trunk/python/qpid_dispatch_internal/router/engine.py?rev=1653556&r1=1653555&r2=1653556&view=diff
==============================================================================
--- qpid/dispatch/trunk/python/qpid_dispatch_internal/router/engine.py 
(original)
+++ qpid/dispatch/trunk/python/qpid_dispatch_internal/router/engine.py Wed Jan 
21 15:22:57 2015
@@ -18,11 +18,10 @@
 #
 
 from data import MessageHELLO, MessageRA, MessageLSU, MessageMAU, MessageMAR, 
MessageLSR
-from neighbor import NeighborEngine
+from hello import HelloProtocol
 from link import LinkStateEngine
 from path import PathEngine
 from mobile import MobileAddressEngine
-from routing import RoutingTableEngine
 from node import NodeTracker
 from message import Message
 
@@ -40,7 +39,7 @@ class RouterEngine:
     """
     """
 
-    def __init__(self, router_adapter, router_id, area, max_routers, 
config_override={}):
+    def __init__(self, router_adapter, router_id, area_unused, max_routers, 
config_override={}):
         """
         Initialize an instance of a router for a domain.
         """
@@ -49,25 +48,28 @@ class RouterEngine:
         ##
         self.domain         = "domain"
         self.router_adapter = router_adapter
-        self.log_adapter    = LogAdapter("ROUTER")
+        self._config        = None # Not yet loaded
+        self._log_hello     = LogAdapter("ROUTER_HELLO")
+        self._log_ls        = LogAdapter("ROUTER_LS")
+        self._log_ma        = LogAdapter("ROUTER_MA")
+        self._log_general   = LogAdapter("ROUTER")
         self.io_adapter     = [IoAdapter(self.receive, "qdrouter"),
+                               IoAdapter(self.receive, "qdrouter.ma"),
                                IoAdapter(self.receive, "qdhello")]
         self.max_routers    = max_routers
         self.id             = router_id
-        self.area           = area
         self.instance       = long(time.time())
-        self.log(LOG_INFO, "Router Engine Instantiated: area=%s id=%s 
instance=%d max_routers=%d" %
-                 (self.area, self.id, self.instance, self.max_routers))
-        self._config         = None # Not yet loaded
+        self.log(LOG_INFO, "Router Engine Instantiated: id=%s instance=%d 
max_routers=%d" %
+                 (self.id, self.instance, self.max_routers))
+
         ##
         ## Launch the sub-module engines
         ##
         self.node_tracker          = NodeTracker(self, self.max_routers)
-        self.neighbor_engine       = NeighborEngine(self)
+        self.hello_protocol        = HelloProtocol(self, self.node_tracker)
         self.link_state_engine     = LinkStateEngine(self)
         self.path_engine           = PathEngine(self)
         self.mobile_address_engine = MobileAddressEngine(self, 
self.node_tracker)
-        self.routing_table_engine  = RoutingTableEngine(self, 
self.node_tracker)
 
 
     
##========================================================================================
@@ -90,12 +92,10 @@ class RouterEngine:
         """
         """
         try:
-            if addr.find('Mtemp.') == 0:  ## This is a temporary measure until 
dynamic is added to Messenger
-                return
             if addr.find('M') == 0:
                 self.mobile_address_engine.add_local_address(addr[1:])
         except Exception, e:
-            self.log(LOG_ERROR, "Exception in new-address processing: 
exception=%r" % e)
+            self.log_ma(LOG_ERROR, "Exception in new-address processing: 
exception=%r" % e)
             exc_type, exc_value, exc_traceback = sys.exc_info()
             traceback.print_tb(exc_traceback)
 
@@ -104,12 +104,10 @@ class RouterEngine:
         """
         """
         try:
-            if addr.find('Mtemp.') == 0:
-                return
             if addr.find('M') == 0:
                 self.mobile_address_engine.del_local_address(addr[1:])
         except Exception, e:
-            self.log(LOG_ERROR, "Exception in del-address processing: 
exception=%r" % e)
+            self.log_ma(LOG_ERROR, "Exception in del-address processing: 
exception=%r" % e)
             exc_type, exc_value, exc_traceback = sys.exc_info()
             traceback.print_tb(exc_traceback)
 
@@ -117,8 +115,7 @@ class RouterEngine:
     def linkLost(self, link_id):
         """
         """
-        self.log(LOG_INFO, "Router Link Lost - link_id=%d" % link_id)
-        self.neighbor_engine.linkLost(link_id)
+        self.node_tracker.link_lost(link_id)
 
 
     def handleTimerTick(self):
@@ -126,11 +123,8 @@ class RouterEngine:
         """
         try:
             now = time.time()
-            self.neighbor_engine.tick(now)
+            self.hello_protocol.tick(now)
             self.link_state_engine.tick(now)
-            self.path_engine.tick(now)
-            self.mobile_address_engine.tick(now)
-            self.routing_table_engine.tick(now)
             self.node_tracker.tick(now)
         except Exception, e:
             self.log(LOG_ERROR, "Exception in timer processing: exception=%r" 
% e)
@@ -145,33 +139,32 @@ class RouterEngine:
             now = time.time()
             if   opcode == 'HELLO':
                 msg = MessageHELLO(body)
-                self.log(LOG_TRACE, "RCVD: %r" % msg)
-                self.neighbor_engine.handle_hello(msg, now, link_id)
+                self.log_hello(LOG_TRACE, "RCVD: %r" % msg)
+                self.hello_protocol.handle_hello(msg, now, link_id)
 
             elif opcode == 'RA':
                 msg = MessageRA(body)
-                self.log(LOG_TRACE, "RCVD: %r" % msg)
+                self.log_ls(LOG_TRACE, "RCVD: %r" % msg)
                 self.link_state_engine.handle_ra(msg, now)
-                self.mobile_address_engine.handle_ra(msg, now)
 
             elif opcode == 'LSU':
                 msg = MessageLSU(body)
-                self.log(LOG_DEBUG, "RCVD: %r" % msg)
+                self.log_ls(LOG_TRACE, "RCVD: %r" % msg)
                 self.link_state_engine.handle_lsu(msg, now)
 
             elif opcode == 'LSR':
                 msg = MessageLSR(body)
-                self.log(LOG_DEBUG, "RCVD: %r" % msg)
+                self.log_ls(LOG_TRACE, "RCVD: %r" % msg)
                 self.link_state_engine.handle_lsr(msg, now)
 
             elif opcode == 'MAU':
                 msg = MessageMAU(body)
-                self.log(LOG_DEBUG, "RCVD: %r" % msg)
+                self.log_ma(LOG_TRACE, "RCVD: %r" % msg)
                 self.mobile_address_engine.handle_mau(msg, now)
 
             elif opcode == 'MAR':
                 msg = MessageMAR(body)
-                self.log(LOG_DEBUG, "RCVD: %r" % msg)
+                self.log_ma(LOG_TRACE, "RCVD: %r" % msg)
                 self.mobile_address_engine.handle_mar(msg, now)
 
         except Exception, e:
@@ -203,7 +196,6 @@ class RouterEngine:
                      'next-hops'      : "Next hops to each known router"
                      }
         if kind == 'link-state'     : return 
self.neighbor_engine.link_state.to_dict()
-        if kind == 'next-hops'      : return 
self.routing_table_engine.next_hops
         if kind == 'link-state-set' :
             copy = {}
             for _id,_ls in self.link_state_engine.collection.items():
@@ -221,7 +213,31 @@ class RouterEngine:
         Emit a log message to the host's event log
         """
         info = traceback.extract_stack(limit=2)[0] # Caller frame info
-        self.log_adapter.log(level, text, info[0], info[1])
+        self._log_general.log(level, text, info[0], info[1])
+
+
+    def log_hello(self, level, text):
+        """
+        Emit a log message to the host's event log
+        """
+        info = traceback.extract_stack(limit=2)[0] # Caller frame info
+        self._log_hello.log(level, text, info[0], info[1])
+
+
+    def log_ls(self, level, text):
+        """
+        Emit a log message to the host's event log
+        """
+        info = traceback.extract_stack(limit=2)[0] # Caller frame info
+        self._log_ls.log(level, text, info[0], info[1])
+
+
+    def log_ma(self, level, text):
+        """
+        Emit a log message to the host's event log
+        """
+        info = traceback.extract_stack(limit=2)[0] # Caller frame info
+        self._log_ma.log(level, text, info[0], info[1])
 
 
     def send(self, dest, msg):
@@ -230,7 +246,6 @@ class RouterEngine:
         """
         app_props = {'opcode' : msg.get_opcode() }
         self.io_adapter[0].send(Message(address=dest, properties=app_props, 
body=msg.to_dict()))
-        self.log(LOG_TRACE, "SENT: %r dest=%s" % (msg, dest))
 
 
     def node_updated(self, addr, reachable, neighbor):
@@ -238,71 +253,3 @@ class RouterEngine:
         """
         self.router_adapter(addr, reachable, neighbor)
 
-
-    
##========================================================================================
-    ## Interconnect between the Sub-Modules
-    
##========================================================================================
-    def local_link_state_changed(self, link_state):
-        self.log(LOG_DEBUG, "Event: local_link_state_changed: %r" % link_state)
-        self.link_state_engine.new_local_link_state(link_state)
-
-    def ls_collection_changed(self, collection):
-        self.log(LOG_DEBUG, "Event: ls_collection_changed: %r" % collection)
-        self.path_engine.ls_collection_changed(collection)
-
-    def next_hops_changed(self, next_hop_table):
-        self.log(LOG_DEBUG, "Event: next_hops_changed: %r" % next_hop_table)
-        self.routing_table_engine.next_hops_changed(next_hop_table)
-
-    def valid_origins_changed(self, valid_origins):
-        self.log(LOG_DEBUG, "Event: valid_origins_changed: %r" % valid_origins)
-        self.routing_table_engine.valid_origins_changed(valid_origins)
-
-    def mobile_sequence_changed(self, mobile_seq):
-        self.log(LOG_DEBUG, "Event: mobile_sequence_changed: %d" % mobile_seq)
-        self.link_state_engine.set_mobile_sequence(mobile_seq)
-
-    def get_next_hops(self):
-        return self.routing_table_engine.get_next_hops()
-
-    def new_neighbor(self, rid, link_id, instance):
-        self.log(LOG_DEBUG, "Event: new_neighbor: id=%s link_id=%d inst=%r" % 
(rid, link_id, instance))
-        self.node_tracker.new_neighbor(rid, link_id, instance)
-        self.link_state_engine.new_neighbor(rid)
-
-    def lost_neighbor(self, rid):
-        self.log(LOG_DEBUG, "Event: lost_neighbor: id=%s" % rid)
-        self.node_tracker.lost_neighbor(rid)
-
-    def new_node(self, rid, instance):
-        self.log(LOG_DEBUG, "Event: new_node: id=%s inst=%r" % (rid, instance))
-        self.node_tracker.new_node(rid, instance)
-
-    def lost_node(self, rid):
-        self.log(LOG_DEBUG, "Event: lost_node: id=%s" % rid)
-        self.node_tracker.lost_node(rid)
-
-    def touch_node(self, rid, instance):
-        return self.node_tracker.touch_node(rid, instance)
-
-    def add_neighbor_router(self, address, router_bit, link_bit):
-        self.log(LOG_DEBUG, "Event: add_neighbor_router: address=%s, 
router_bit=%d, link_bit=%d" % \
-                     (address, router_bit, link_bit))
-        self.router_adapter.add_neighbor_router(address, router_bit, link_bit)
-
-    def del_neighbor_router(self, router_id, router_bit):
-        self.log(LOG_DEBUG, "Event: del_neighbor_router: router_bit=%d" % 
router_bit)
-        self.link_state_engine.purge_remote(router_id)
-        self.mobile_address_engine.purge_remote(router_id)
-        self.router_adapter.del_neighbor_router(router_bit)
-
-    def add_remote_router(self, address, router_bit):
-        self.log(LOG_DEBUG, "Event: add_remote_router: address=%s, 
router_bit=%d" % (address, router_bit))
-        self.router_adapter.add_remote_router(address, router_bit)
-
-    def del_remote_router(self, router_id, router_bit):
-        self.log(LOG_DEBUG, "Event: del_remote_router: router_bit=%d" % 
router_bit)
-        self.link_state_engine.purge_remote(router_id)
-        self.mobile_address_engine.purge_remote(router_id)
-        self.router_adapter.del_remote_router(router_bit)
-

Copied: qpid/dispatch/trunk/python/qpid_dispatch_internal/router/hello.py (from 
r1653545, qpid/dispatch/trunk/python/qpid_dispatch_internal/router/neighbor.py)
URL: 
http://svn.apache.org/viewvc/qpid/dispatch/trunk/python/qpid_dispatch_internal/router/hello.py?p2=qpid/dispatch/trunk/python/qpid_dispatch_internal/router/hello.py&p1=qpid/dispatch/trunk/python/qpid_dispatch_internal/router/neighbor.py&r1=1653545&r2=1653556&rev=1653556&view=diff
==============================================================================
--- qpid/dispatch/trunk/python/qpid_dispatch_internal/router/neighbor.py 
(original)
+++ qpid/dispatch/trunk/python/qpid_dispatch_internal/router/hello.py Wed Jan 
21 15:22:57 2015
@@ -18,38 +18,29 @@
 #
 
 from data import LinkState, MessageHELLO
-from dispatch import LOG_INFO
+from dispatch import LOG_INFO, LOG_TRACE
 
-class NeighborEngine(object):
+class HelloProtocol(object):
     """
-    This module is responsible for maintaining this router's link-state.  It 
runs the HELLO protocol
-    with the router's neighbors and notifies outbound when the list of 
neighbors-in-good-standing (the
-    link-state) changes.
+    This module is responsible for running the HELLO protocol.
     """
-    def __init__(self, container):
-        self.container = container
-        self.id = self.container.id
-        self.area = self.container.area
+    def __init__(self, container, node_tracker):
+        self.container       = container
+        self.node_tracker    = node_tracker
+        self.id              = self.container.id
         self.last_hello_time = 0.0
-        self.hello_interval = container.config.helloInterval
-        self.hello_max_age = container.config.helloMaxAge
-        self.hellos = {}
-        self.link_state_changed = False
-        self.link_state = LinkState(None, self.id, self.area, 0, [])
+        self.hello_interval  = container.config.helloInterval
+        self.hello_max_age   = container.config.helloMaxAge
+        self.hellos          = {}
 
 
     def tick(self, now):
         self._expire_hellos(now)
-
         if now - self.last_hello_time >= self.hello_interval:
             self.last_hello_time = now
-            self.container.send('amqp:/_local/qdhello', 
-                                MessageHELLO(None, self.id, self.area, 
self.hellos.keys(), self.container.instance))
-
-        if self.link_state_changed:
-            self.link_state_changed = False
-            self.link_state.bump_sequence()
-            self.container.local_link_state_changed(self.link_state)
+            msg = MessageHELLO(None, self.id, self.hellos.keys(), 
self.container.instance)
+            self.container.send('amqp:/_local/qdhello', msg)
+            self.container.log_hello(LOG_TRACE, "SENT: %r" % msg)
 
 
     def handle_hello(self, msg, now, link_id):
@@ -57,27 +48,16 @@ class NeighborEngine(object):
             return
         self.hellos[msg.id] = now
         if msg.is_seen(self.id):
-            if self.link_state.add_peer(msg.id):
-                self.link_state_changed = True
-                self.container.new_neighbor(msg.id, link_id, msg.instance)
-                self.container.log(LOG_INFO, "New neighbor established: %s on 
link: %d" % (msg.id, link_id))
-
-    def linkLost(self, link_id):
-        node_id = self.container.node_tracker.link_id_to_node_id(link_id)
-        if node_id:
-            self._delete_neighbor(node_id)
-
-    def _delete_neighbor(self, key):
-        self.hellos.pop(key)
-        if self.link_state.del_peer(key):
-            self.link_state_changed = True
-            self.container.lost_neighbor(key)
-            self.container.log(LOG_INFO, "Neighbor lost: %s" % key)
+            self.node_tracker.neighbor_refresh(msg.id, msg.instance, link_id, 
now)
+
 
     def _expire_hellos(self, now):
-        to_delete = []
+        """
+        Expire local records of received hellos.  This is not involved in the
+        expiration of neighbor status for routers.
+        """
         for key, last_seen in self.hellos.items():
             if now - last_seen > self.hello_max_age:
-                to_delete.append(key)
-        for key in to_delete:
-            self._delete_neighbor(key)
+                self.hellos.pop(key)
+                self.container.log_hello(LOG_INFO, "HELLO peer expired: %s" % 
key)
+

Modified: qpid/dispatch/trunk/python/qpid_dispatch_internal/router/link.py
URL: 
http://svn.apache.org/viewvc/qpid/dispatch/trunk/python/qpid_dispatch_internal/router/link.py?rev=1653556&r1=1653555&r2=1653556&view=diff
==============================================================================
--- qpid/dispatch/trunk/python/qpid_dispatch_internal/router/link.py (original)
+++ qpid/dispatch/trunk/python/qpid_dispatch_internal/router/link.py Wed Jan 21 
15:22:57 2015
@@ -18,135 +18,61 @@
 #
 
 from data import MessageRA, MessageLSU, MessageLSR
-from dispatch import LOG_INFO
+from dispatch import LOG_INFO, LOG_TRACE
 
 class LinkStateEngine(object):
     """
-    This module is responsible for running the Link State protocol and 
maintaining the set
-    of link states that are gathered from the domain.  It notifies outbound 
when changes to
-    the link-state-collection are detected.
+    This module is responsible for running the Link State protocol.
     """
     def __init__(self, container):
         self.container = container
+        self.node_tracker = container.node_tracker
         self.id = self.container.id
-        self.area = self.container.area
         self.ra_interval = self.container.config.raInterval
-        self.remote_ls_max_age = self.container.config.remoteLsMaxAge
         self.last_ra_time = 0
-        self.collection = {}
-        self.collection_changed = False
-        self.mobile_seq = 0
-        self.needed_lsrs = {}
+        self.mobile_seq   = 0
 
 
-    def tick(self, now):
-        self._expire_ls(now)
-        self._send_lsrs()
+    def set_mobile_seq(self, mobile_seq):
+        self.mobile_seq = mobile_seq
 
-        if now - self.last_ra_time >= self.ra_interval:
-            self.last_ra_time = now
-            self._send_ra()
 
-        if self.collection_changed:
-            self.collection_changed = False
-            self.container.log(LOG_INFO, "New Link-State Collection:")
-            for a,b in self.collection.items():
-                self.container.log(LOG_INFO, "  %s => %r" % (a, b.peers))
-            self.container.ls_collection_changed(self.collection)
+    def tick(self, now):
+        if now - self.last_ra_time >= self.ra_interval:
+            self.send_ra(now)
 
 
     def handle_ra(self, msg, now):
         if msg.id == self.id:
             return
-        if msg.id in self.collection:
-            ls = self.collection[msg.id]
-            ls.last_seen = now
-            if ls.ls_seq < msg.ls_seq:
-                self.needed_lsrs[(msg.area, msg.id)] = None
-            if self.container.touch_node(msg.id, msg.instance):
-                self.needed_lsrs[(msg.area, msg.id)] = None
-        else:
-            self.needed_lsrs[(msg.area, msg.id)] = None
+        self.node_tracker.ra_received(msg.id, msg.ls_seq, msg.mobile_seq, 
msg.instance, now)
 
 
     def handle_lsu(self, msg, now):
         if msg.id == self.id:
             return
-        if msg.id in self.collection:
-            ls = self.collection[msg.id]
-            if ls.ls_seq < msg.ls_seq:
-                ls = msg.ls
-                self.collection[msg.id] = ls
-                self.collection_changed = True
-            ls.last_seen = now
-        else:
-            ls = msg.ls
-            self.collection[msg.id] = ls
-            self.collection_changed = True
-            ls.last_seen = now
-            self.container.new_node(msg.id, msg.instance)
-            self.container.log(LOG_INFO, "Learned link-state from new router: 
%s" % msg.id)
-        # Schedule LSRs for any routers referenced in this LS that we don't 
know about
-        for _id in msg.ls.peers:
-            if _id not in self.collection:
-                self.container.new_node(_id, None)
-                self.needed_lsrs[(msg.area, _id)] = None
+        self.node_tracker.link_state_received(msg.id, msg.ls, msg.instance, 
now)
 
 
     def handle_lsr(self, msg, now):
         if msg.id == self.id:
             return
-        if self.id not in self.collection:
-            self.needed_lsrs[(msg.area, msg.id)] = None
-            return
-        my_ls = self.collection[self.id]
-        self.container.send('amqp:/_topo/%s/%s/qdrouter' % (msg.area, msg.id), 
-                            MessageLSU(None, self.id, self.area, my_ls.ls_seq, 
my_ls, self.container.instance))
-
-
-    def new_neighbor(self, _id):
-        self.needed_lsrs[(self.area, _id)] = None
-
-
-    def new_local_link_state(self, link_state):
-        self.collection[self.id] = link_state
-        self.collection_changed = True
-        self._send_ra()
-
-
-    def set_mobile_sequence(self, seq):
-        self.mobile_seq = seq
-
-
-    def get_collection(self):
-        return self.collection
-
-
-    def purge_remote(self, _id):
-        try:
-            ls = self.collection[_id]
-            ls.del_all_peers()
-        except:
-            pass
-
-
-    def _expire_ls(self, now):
-        for key, ls in self.collection.items():
-            if key != self.id and ls.has_peers() and now - ls.last_seen > 
self.remote_ls_max_age:
-                ls.del_all_peers()
-                self.collection_changed = True
-                self.container.log(LOG_INFO, "Expired link-state from router: 
%s" % key)
-
-
-    def _send_lsrs(self):
-        for (_area, _id) in self.needed_lsrs.keys():
-            self.container.send('amqp:/_topo/%s/%s/qdrouter' % (_area, _id), 
MessageLSR(None, self.id, self.area))
-        self.needed_lsrs = {}
-
-
-    def _send_ra(self):
-        ls_seq = 0
-        if self.id in self.collection:
-            ls_seq = self.collection[self.id].ls_seq
-        self.container.send('amqp:/_topo/%s/all/qdrouter' % self.area,
-                            MessageRA(None, self.id, self.area, ls_seq, 
self.mobile_seq, self.container.instance))
+        self.node_tracker.router_learned(msg.id)
+        my_ls = self.node_tracker.link_state
+        smsg = MessageLSU(None, self.id, my_ls.ls_seq, my_ls, 
self.container.instance)
+        self.container.send('amqp:/_topo/%s/%s/qdrouter' % (msg.area, msg.id), 
smsg)
+        self.container.log_ls(LOG_TRACE, "SENT: %r" % smsg)
+
+
+    def send_lsr(self, _id):
+        msg = MessageLSR(None, self.id)
+        self.container.send('amqp:/_topo/0/%s/qdrouter' % _id, msg)
+        self.container.log_ls(LOG_TRACE, "SENT: %r to: %s" % (msg, _id))
+
+
+    def send_ra(self, now):
+        self.last_ra_time = now
+        ls_seq = self.node_tracker.link_state.ls_seq
+        msg = MessageRA(None, self.id, ls_seq, self.mobile_seq, 
self.container.instance)
+        self.container.send('amqp:/_topo/0/all/qdrouter', msg)
+        self.container.log_ls(LOG_TRACE, "SENT: %r" % msg)

Modified: qpid/dispatch/trunk/python/qpid_dispatch_internal/router/mobile.py
URL: 
http://svn.apache.org/viewvc/qpid/dispatch/trunk/python/qpid_dispatch_internal/router/mobile.py?rev=1653556&r1=1653555&r2=1653556&view=diff
==============================================================================
--- qpid/dispatch/trunk/python/qpid_dispatch_internal/router/mobile.py 
(original)
+++ qpid/dispatch/trunk/python/qpid_dispatch_internal/router/mobile.py Wed Jan 
21 15:22:57 2015
@@ -18,7 +18,9 @@
 #
 
 from data import MessageMAR, MessageMAU
-from dispatch import LOG_DEBUG
+from dispatch import LOG_DEBUG, LOG_TRACE
+
+MAX_KEPT_DELTAS = 10
 
 class MobileAddressEngine(object):
     """
@@ -28,38 +30,37 @@ class MobileAddressEngine(object):
     is directly bound.
     """
     def __init__(self, container, node_tracker):
-        self.container = container
-        self.node_tracker = node_tracker
-        self.id = self.container.id
-        self.area = self.container.area
-        self.mobile_addr_max_age = self.container.config.mobileAddrMaxAge
-        self.mobile_seq = 0
-        self.local_addrs = []
-        self.added_addrs = []
+        self.container     = container
+        self.node_tracker  = node_tracker
+        self.id            = self.container.id
+        self.mobile_seq    = 0
+        self.local_addrs   = []
+        self.added_addrs   = []
         self.deleted_addrs = []
-        self.remote_lists = {}      # map router_id => (sequence, list of 
addrs)
-        self.remote_last_seen = {}  # map router_id => time of last seen 
advertisement/update
-        self.needed_mars = {}
+        self.sent_deltas   = {}
 
 
     def tick(self, now):
-        self._expire_remotes(now)
-        self._send_mars()
-
         ##
         ## If local addrs have changed, collect the changes and send a MAU 
with the diffs
         ## Note: it is important that the differential-MAU be sent before a RA 
is sent
         ##
         if len(self.added_addrs) > 0 or len(self.deleted_addrs) > 0:
             self.mobile_seq += 1
-            self.container.send('amqp:/_topo/%s/all/qdrouter' % self.area,
-                                MessageMAU(None, self.id, self.area, 
self.mobile_seq, self.added_addrs, self.deleted_addrs))
+            msg = MessageMAU(None, self.id, self.mobile_seq, self.added_addrs, 
self.deleted_addrs)
+
+            self.sent_deltas[self.mobile_seq] = msg
+            if len(self.sent_deltas) > MAX_KEPT_DELTAS:
+                self.sent_deltas.pop(self.mobile_seq - MAX_KEPT_DELTAS)
+
+            self.container.send('amqp:/_topo/0/all/qdrouter.ma', msg)
+            self.container.log_ma(LOG_TRACE, "SENT: %r" % msg)
             self.local_addrs.extend(self.added_addrs)
             for addr in self.deleted_addrs:
                 self.local_addrs.remove(addr)
-            self.added_addrs = []
+            self.added_addrs   = []
             self.deleted_addrs = []
-            self.container.mobile_sequence_changed(self.mobile_seq)
+        return self.mobile_seq
 
 
     def add_local_address(self, addr):
@@ -84,22 +85,6 @@ class MobileAddressEngine(object):
                 self.added_addrs.remove(addr)
 
 
-    def handle_ra(self, msg, now):
-        if msg.id == self.id:
-            return
-
-        if msg.mobile_seq == 0:
-            return
-
-        if msg.id in self.remote_lists:
-            _seq, _list = self.remote_lists[msg.id]
-            self.remote_last_seen[msg.id] = now
-            if _seq < msg.mobile_seq:
-                self.needed_mars[(msg.id, msg.area, _seq)] = None
-        else:
-            self.needed_mars[(msg.id, msg.area, 0)] = None
-
-
     def handle_mau(self, msg, now):
         ##
         ## If the MAU is differential, we can only use it if its sequence is 
exactly one greater
@@ -109,87 +94,69 @@ class MobileAddressEngine(object):
         ##
         if msg.id == self.id:
             return
+        node = self.node_tracker.router_node(msg.id)
 
         if msg.exist_list != None:
             ##
             ## Absolute MAU
             ##
-            if msg.id in self.remote_lists:
-                _seq, _list = self.remote_lists[msg.id]
-                if _seq >= msg.mobile_seq:  # ignore duplicates
-                    return
-            self.remote_lists[msg.id] = (msg.mobile_seq, msg.exist_list)
-            self.remote_last_seen[msg.id] = now
-            (add_list, del_list) = 
self.node_tracker.overwrite_addresses(msg.id, msg.exist_list)
-            self._activate_remotes(msg.id, add_list, del_list)
+            if msg.mobile_seq == node.mobile_address_sequence:
+                return
+            node.mobile_address_sequence = msg.mobile_seq
+            node.overwrite_addresses(msg.exist_list)
         else:
             ##
             ## Differential MAU
             ##
-            if msg.id in self.remote_lists:
-                _seq, _list = self.remote_lists[msg.id]
-                if _seq == msg.mobile_seq:  # ignore duplicates
-                    return
-                self.remote_last_seen[msg.id] = now
-                if _seq + 1 == msg.mobile_seq:
-                    ##
-                    ## This is one greater than our stored value, incorporate 
the deltas
-                    ##
-                    if msg.add_list and msg.add_list.__class__ == list:
-                        _list.extend(msg.add_list)
-                    if msg.del_list and msg.del_list.__class__ == list:
-                        for addr in msg.del_list:
-                            _list.remove(addr)
-                    self.remote_lists[msg.id] = (msg.mobile_seq, _list)
-                    if msg.add_list != None:
-                        self.node_tracker.add_addresses(msg.id, msg.add_list)
-                    if msg.del_list != None:
-                        self.node_tracker.del_addresses(msg.id, msg.del_list)
-                    self._activate_remotes(msg.id, msg.add_list, msg.del_list)
-                else:
-                    self.needed_mars[(msg.id, msg.area, _seq)] = None
+            if node.mobile_address_sequence + 1 == msg.mobile_seq:
+                ##
+                ## This message represents the next expected sequence, 
incorporate the deltas
+                ##
+                node.mobile_address_sequence += 1
+                for a in msg.add_list:
+                    node.map_address(a)
+                for a in msg.del_list:
+                    node.unmap_address(a)
+
+            elif node.mobile_address_sequence == msg.mobile_seq:
+                ##
+                ## Ignore duplicates
+                ##
+                return
+
             else:
-                self.needed_mars[(msg.id, msg.area, 0)] = None
+                ##
+                ## This is an out-of-sequence delta.  Don't use it.  Schedule 
a MAR to
+                ## get back on track.
+                ##
+                node.mobile_address_request()
 
 
     def handle_mar(self, msg, now):
         if msg.id == self.id:
             return
-        if msg.have_seq < self.mobile_seq:
-            self.container.send('amqp:/_topo/%s/%s/qdrouter' % (msg.area, 
msg.id),
-                                MessageMAU(None, self.id, self.area, 
self.mobile_seq, None, None, self.local_addrs))
-
-
-    def purge_remote(self, _id):
-        try:
-            (add_list, del_list) = self.node_tracker.overwrite_addresses(_id, 
[])
-            self._activate_remotes(_id, add_list, del_list)
-            self.remote_lists.pop(_id)
-            self.remote_last_seen.pop(_id)
-            self.container.log(LOG_DEBUG, "Purged remote records for node: %s" 
% _id)
-        except:
-            pass
-
-
-    def _expire_remotes(self, now):
-        for _id, t in self.remote_last_seen.items():
-            if now - t > self.mobile_addr_max_age:
-                self.remote_lists.pop(_id)
-                self.remote_last_seen.pop(_id)
-                self.container.log(LOG_DEBUG, "Expired remote mobile addresses 
on node: %s" % _id)
-
-
-    def _send_mars(self):
-        for _id, _area, _seq in self.needed_mars.keys():
-            self.container.send('amqp:/_topo/%s/%s/qdrouter' % (_area, _id), 
MessageMAR(None, self.id, self.area, _seq))
-        self.needed_mars = {}
-
-
-    def _activate_remotes(self, _id, added, deleted):
-        bit = self.node_tracker.maskbit_for_node(_id)
-        if added != None:
-            for a in added:
-                self.container.router_adapter.map_destination(a[0], a[1:], bit)
-        if deleted != None:
-            for d in deleted:
-                self.container.router_adapter.unmap_destination(d[0], d[1:], 
bit)
+        if msg.have_seq == self.mobile_seq:
+            return
+        if self.mobile_seq - (msg.have_seq + 1) < len(self.sent_deltas):
+            ##
+            ## We can catch the peer up with a series of stored differential 
updates
+            ##
+            for s in range(msg.have_seq + 1, self.mobile_seq + 1):
+                self.container.send('amqp:/_topo/0/%s/qdrouter.ma' % msg.id, 
self.sent_deltas[s])
+                self.container.log_ma(LOG_TRACE, "SENT: %r" % 
self.sent_deltas[s])
+            return
+
+        ##
+        ## The peer needs to be sent an absolute update with the whole address 
list
+        ##
+        smsg = MessageMAU(None, self.id, self.mobile_seq, None, None, 
self.local_addrs)
+        self.container.send('amqp:/_topo/0/%s/qdrouter.ma' % msg.id, smsg)
+        self.container.log_ma(LOG_TRACE, "SENT: %r" % smsg)
+
+
+    def send_mar(self, node_id, seq):
+        msg = MessageMAR(None, self.id, seq)
+        self.container.send('amqp:/_topo/0/%s/qdrouter.ma' % node_id, msg)
+        self.container.log_ma(LOG_TRACE, "SENT: %r" % msg)
+
+

Modified: qpid/dispatch/trunk/python/qpid_dispatch_internal/router/node.py
URL: 
http://svn.apache.org/viewvc/qpid/dispatch/trunk/python/qpid_dispatch_internal/router/node.py?rev=1653556&r1=1653555&r2=1653556&view=diff
==============================================================================
--- qpid/dispatch/trunk/python/qpid_dispatch_internal/router/node.py (original)
+++ qpid/dispatch/trunk/python/qpid_dispatch_internal/router/node.py Wed Jan 21 
15:22:57 2015
@@ -17,6 +17,9 @@
 # under the License.
 #
 
+from dispatch import LOG_INFO, LOG_ERROR, LOG_TRACE
+from data import LinkState
+
 class NodeTracker(object):
     """
     This module is responsible for tracking the set of router nodes that are 
known to this
@@ -26,150 +29,270 @@ class NodeTracker(object):
     The mask bit is used in the main router to represent sets of valid 
destinations for addresses.
     """
     def __init__(self, container, max_routers):
-        self.container        = container
-        self.max_routers      = max_routers
-        self.nodes            = {}  # id => RemoteNode
-        self.nodes_by_link_id = {}  # link-id => node-id
-        self.maskbits         = []
-        self.next_maskbit     = 1   # Reserve bit '0' to represent this router
+        self.container             = container
+        self.my_id                 = container.id
+        self.max_routers           = max_routers
+        self.link_state            = LinkState(None, self.my_id, 0, [])
+        self.link_state_changed    = False
+        self.recompute_topology    = False
+        self.nodes                 = {}  # id => RouterNode
+        self.nodes_by_link_id      = {}  # link-id => node-id
+        self.maskbits              = []
+        self.next_maskbit          = 1   # Reserve bit '0' to represent this 
router
         for i in range(max_routers):
             self.maskbits.append(None)
-        self.maskbits[0] = True
+        self.maskbits[0]      = True
+        self.neighbor_max_age = self.container.config.helloMaxAge
+        self.ls_max_age       = self.container.config.remoteLsMaxAge
+
+
+    def _do_expirations(self, now):
+        """
+        Run through the list of routers and check for expired conditions
+        """
+        for node_id, node in self.nodes.items():
+            ##
+            ## If the node is a neighbor, check the neighbor refresh time to 
see
+            ## if we've waited too long for a refresh.  If so, disconnect the 
link
+            ## and remove the node from the local link state.
+            ##
+            if node.is_neighbor():
+                if now - node.neighbor_refresh_time > self.neighbor_max_age:
+                    node.remove_link()
+                    if self.link_state.del_peer(node_id):
+                        self.link_state_changed = True
+
+            ##
+            ## Check the age of the node's link state.  If it's too old, clear 
it out.
+            ##
+            if now - node.link_state.last_seen > self.ls_max_age:
+                if node.link_state.has_peers():
+                    node.link_state.del_all_peers()
+                    self.recompute_topology = True
+
+            ##
+            ## If the node has empty link state, check to see if it appears in 
any other
+            ## node's link state.  If it does not, then delete the node.
+            ##
+            if not node.link_state.has_peers() and not node.is_neighbor():
+                delete_node = True
+                for _id, _n in self.nodes.items():
+                    if _id != node_id:
+                        if _n.link_state.is_peer(node_id):
+                            delete_node = False
+                            break
+                if delete_node:
+                    ##
+                    ## The keep_alive_count is set to zero when a new node is 
first
+                    ## discovered.  Since we can learn about a node before we 
receive
+                    ## its link state, the keep_alive_count is used to prevent 
the
+                    ## node from being deleted before we can learn more about 
it.
+                    ##
+                    node.keep_alive_count += 1
+                    if node.keep_alive_count > 2:
+                        node.delete()
+                        self.nodes.pop(node_id)
 
 
     def tick(self, now):
-        pass
+        send_ra = False
 
+        ##
+        ## Expire neighbors and link state
+        ##
+        self._do_expirations(now)
+
+        ##
+        ## Handle local link state changes
+        ##
+        if self.link_state_changed:
+            self.link_state_changed = False
+            self.link_state.bump_sequence()
+            self.recompute_topology = True
+            send_ra = True
+            self.container.log_ls(LOG_TRACE, "Local Link State: %r" % 
self.link_state)
+
+        ##
+        ## Recompute the topology
+        ##
+        if self.recompute_topology:
+            self.recompute_topology = False
+            collection = {self.my_id : self.link_state}
+            for node_id, node in self.nodes.items():
+                collection[node_id] = node.link_state
+            next_hops, valid_origins = 
self.container.path_engine.calculate_routes(collection)
+            self.container.log_ls(LOG_TRACE, "Computed next hops: %r" % 
next_hops)
+            self.container.log_ls(LOG_TRACE, "Computed valid origins: %r" % 
valid_origins)
+
+            ##
+            ## Update the next hops and valid origins for each node
+            ##
+            for node_id, next_hop_id in next_hops.items():
+                node     = self.nodes[node_id]
+                next_hop = self.nodes[next_hop_id]
+                vo       = valid_origins[node_id]
+                node.set_next_hop(next_hop)
+                node.set_valid_origins(vo)
+
+        ##
+        ## Send link-state requests and mobile-address requests to the nodes
+        ## that have pending requests and are reachable
+        ##
+        for node_id, node in self.nodes.items():
+            if node.link_state_requested():
+                self.container.link_state_engine.send_lsr(node_id)
+            if node.mobile_address_requested():
+                self.container.mobile_address_engine.send_mar(node_id, 
node.mobile_address_sequence)
+
+        ##
+        ## If local changes have been made to the list of mobile addresses, 
send
+        ## an unsolicited mobile-address-update to all routers.
+        ##
+        mobile_seq = self.container.mobile_address_engine.tick(now)
+        self.container.link_state_engine.set_mobile_seq(mobile_seq)
+
+        ##
+        ## Send an immediate RA if our link state changed
+        ##
+        if send_ra:
+            self.container.link_state_engine.send_ra(now)
+
+
+    def neighbor_refresh(self, node_id, instance, link_id, now):
+        """
+        Invoked when the hello protocol has received positive confirmation
+        of continued bi-directional connectivity with a neighbor router.
+        """
+
+        ##
+        ## If the node id is not known, create a new RouterNode to track it.
+        ##
+        if node_id not in self.nodes:
+            self.nodes[node_id] = RouterNode(self, node_id, instance)
+        node = self.nodes[node_id]
 
-    def new_neighbor(self, node_id, link_maskbit, instance):
-        """
-        A node, designated by node_id, has been discovered as a neighbor over 
a link with
-        a maskbit of link_maskbit.
-        """
-        self.nodes_by_link_id[link_maskbit] = node_id
-        if node_id in self.nodes:
+        ##
+        ## Set the link_id to indicate this is a neighbor router.  If the 
link_id
+        ## changed, update the index and add the neighbor to the local link 
state.
+        ##
+        if node.set_link_id(link_id):
+            self.nodes_by_link_id[link_id] = node
+            node.request_link_state()
+            if self.link_state.add_peer(node_id):
+                self.link_state_changed = True
+
+        ##
+        ## Update the refresh time for later expiration checks
+        ##
+        node.neighbor_refresh_time = now
+
+        ##
+        ## If the instance was updated (i.e. the neighbor restarted suddenly),
+        ## schedule a topology recompute and a link-state-request to that 
router.
+        ##
+        if node.update_instance(instance):
+            self.recompute_topology = True
+            node.request_link_state()
+
+
+    def link_lost(self, link_id):
+        """
+        Invoked when an inter-router link is dropped.
+        """
+        self.container.log_ls(LOG_INFO, "Router Link Lost - link_id=%d" % 
link_id)
+        node_id = self.link_id_to_node_id(link_id)
+        if node_id:
+            self.nodes_by_link_id.pop(link_id)
             node = self.nodes[node_id]
-            if node.neighbor:
-                if node.update_instance(instance):
-                    self.container.del_neighbor_router(node.id, node.maskbit)
-                else:
-                    return
-            else:
-                self.container.del_remote_router(node.id, node.maskbit)
-                node.neighbor = True
-                node.link_id  = link_maskbit
-        else:
-            node = RemoteNode(node_id, self._allocate_maskbit(), True, 
link_maskbit, instance)
-            self.nodes[node_id] = node
-        self.container.add_neighbor_router(self._address(node_id), 
node.maskbit, link_maskbit)
+            node.remove_link()
+            if self.link_state.del_peer(node_id):
+                self.link_state_changed = True
 
 
-    def lost_neighbor(self, node_id):
+    def ra_received(self, node_id, ls_seq, mobile_seq, instance, now):
         """
-        We have lost contact with a neighboring node.
+        Invoked when a router advertisement is received from another router.
         """
+        ##
+        ## If the node id is not known, create a new RouterNode to track it.
+        ##
+        if node_id not in self.nodes:
+            self.nodes[node_id] = RouterNode(self, node_id, instance)
         node = self.nodes[node_id]
-        node.neighbor = False
-        self.nodes_by_link_id.pop(node.link_id)
-        node.link_id = None
-        self.container.del_neighbor_router(node.id, node.maskbit)
-        if node.remote:
-            self.container.add_remote_router(self._address(node.id), 
node.maskbit)
-        else:
-            self._free_maskbit(node.maskbit)
-            self.nodes.pop(node_id)
 
+        ##
+        ## If the instance was updated (i.e. the router restarted suddenly),
+        ## schedule a topology recompute and a link-state-request to that 
router.
+        ##
+        if node.update_instance(instance):
+            self.recompute_topology = True
+            node.request_link_state()
 
-    def new_node(self, node_id, instance):
-        """
-        A node, designated by node_id, has been discovered through the an 
advertisement from a
-        remote peer.
-        """
-        if node_id not in self.nodes:
-            node = RemoteNode(node_id, self._allocate_maskbit(), False, None, 
instance)
-            self.nodes[node_id] = node
-            self.container.add_remote_router(self._address(node.id), 
node.maskbit)
-        else:
-            node = self.nodes[node_id]
-            node.remote = True
-            if node.update_instance(instance):
-                self.container.del_remote_router(node.id, node.maskbit)
-                self.container.add_remote_router(self._address(node.id), 
node.maskbit)
+        ##
+        ## Update the last seen time to now to control expiration of the link 
state.
+        ##
+        node.link_state.last_seen = now
 
+        ##
+        ## Check the link state sequence.  Send a link state request if our 
records are
+        ## not up to date.
+        ##
+        if node.link_state.ls_seq < ls_seq:
+            self.container.link_state_engine.send_lsr(node_id)
 
-    def lost_node(self, node_id):
-        """
-        A remote node, node_id, has not been heard from for too long and is 
being deemed lost.
-        """
-        node = self.nodes[node_id]
-        if node.remote:
-            node.remote = False
-            if not node.neighbor:
-                self.container.del_remote_router(node.id, node.maskbit)
-                self._free_maskbit(node.maskbit)
-                self.nodes.pop(node_id)
+        ##
+        ## Check the mobile sequence.  Send a mobile-address-request if we are
+        ## behind the advertized sequence.
+        ##
+        if node.mobile_address_sequence < mobile_seq:
+            node.mobile_address_request()
 
 
-    def touch_node(self, node_id, instance):
+    def router_learned(self, node_id):
         """
-        We've received an advertisement or hello from a node.  If the instance 
has changed,
-        we need to treat the node as though it was lost and regained.
+        Invoked when we learn about another router by any means
         """
-        try:
-            node = self.nodes[node_id]
-            if node.update_instance(instance):
-                if node.neighbor:
-                    self.container.del_neighbor_router(node.id, node.maskbit)
-                    self.container.add_neighbor_router(self._address(node_id), 
node.maskbit, node.link_id)
-                elif node.remote:
-                    self.container.del_remote_router(node.id, node.maskbit)
-                    self.container.add_remote_router(self._address(node.id), 
node.maskbit)
-                return True
-        except:
-            pass
-        return False
+        if node_id not in self.nodes and node_id != self.my_id:
+            self.nodes[node_id] = RouterNode(self, node_id, None)
 
 
-    def maskbit_for_node(self, node_id):
+    def link_state_received(self, node_id, link_state, instance, now):
         """
+        Invoked when a link state update is received from another router.
         """
-        if node_id in self.nodes:
-            node = self.nodes[node_id]
-            return node.maskbit
-        return None
-
-
-    def add_addresses(self, node_id, addrs):
+        ##
+        ## If the node id is not known, create a new RouterNode to track it.
+        ##
+        if node_id not in self.nodes:
+            self.nodes[node_id] = RouterNode(self, node_id, instance)
         node = self.nodes[node_id]
-        for a in addrs:
-            node.addrs[a] = 1
 
+        ##
+        ## If the new link state is more up-to-date than the stored link state,
+        ## update it and schedule a topology recompute.
+        ##
+        if link_state.ls_seq > node.link_state.ls_seq:
+            node.link_state = link_state
+            node.link_state.last_seen = now
+            self.recompute_topology = True
+
+            ##
+            ## Look through the new link state for references to nodes that we 
don't
+            ## know about.  Schedule link state requests for those nodes to be 
sent
+            ## after we next recompute the topology.
+            ##
+            for peer in node.link_state.peers:
+                if peer not in self.nodes:
+                    self.router_learned(peer)
 
-    def del_addresses(self, node_id, addrs):
-        node = self.nodes[node_id]
-        for a in addrs:
-            node.addrs.pop(a)
 
-
-    def overwrite_addresses(self, node_id, addrs):
-        node    = self.nodes[node_id]
-        added   = []
-        deleted = []
-        for a in addrs:
-            if a not in node.addrs.keys():
-                added.append(a)
-        for a in node.addrs.keys():
-            if a not in addrs:
-                deleted.append(a)
-        node.addrs = {}
-        for a in addrs:
-            node.addrs[a] = 1
-        return (added, deleted)
+    def router_node(self, node_id):
+        return self.nodes[node_id]
 
 
     def link_id_to_node_id(self, link_id):
         if link_id in self.nodes_by_link_id:
-            return self.nodes_by_link_id[link_id]
+            return self.nodes_by_link_id[link_id].id
         return None
 
 
@@ -192,20 +315,148 @@ class NodeTracker(object):
             self.next_maskbit = i
 
 
-    def _address(self, node_id):
-        return "amqp:/_topo/%s/%s" % (self.container.area, node_id)
 
+class RouterNode(object):
+    """
+    RouterNode is used to track remote routers in the router network.
+    """
 
-class RemoteNode(object):
+    def __init__(self, parent, node_id, instance):
+        self.parent                  = parent
+        self.adapter                 = parent.container.router_adapter
+        self.log                     = parent.container.log
+        self.id                      = node_id
+        self.instance                = instance
+        self.maskbit                 = self.parent._allocate_maskbit()
+        self.neighbor_refresh_time   = 0.0
+        self.peer_link_id            = None
+        self.link_state              = LinkState(None, self.id, 0, [])
+        self.next_hop_router         = None
+        self.valid_origins           = None
+        self.mobile_addresses        = []
+        self.mobile_address_sequence = 0
+        self.need_ls_request         = True
+        self.need_mobile_request     = False
+        self.keep_alive_count        = 0
+        self.adapter.add_router("amqp:/_topo/0/%s/qdrouter" % self.id, 
self.maskbit)
+        self.log(LOG_TRACE, "Node %s created: maskbit=%d" % (self.id, 
self.maskbit))
+
+
+    def set_link_id(self, link_id):
+        if self.peer_link_id == link_id:
+            return False
+        self.peer_link_id = link_id
+        self.next_hop_router = None
+        self.adapter.set_link(self.maskbit, link_id)
+        self.adapter.remove_next_hop(self.maskbit)
+        self.log(LOG_TRACE, "Node %s link set: link_id=%r" % (self.id, 
link_id))
+        return True
+
+
+    def remove_link(self):
+        if self.peer_link_id != None:
+            self.peer_link_id = None
+            self.adapter.remove_link(self.maskbit)
+            self.log(LOG_TRACE, "Node %s link removed" % self.id)
 
-    def __init__(self, node_id, maskbit, neighbor, link_id, instance):
-        self.id       = node_id
-        self.maskbit  = maskbit
-        self.neighbor = neighbor
-        self.remote   = not neighbor
-        self.link_id  = link_id
-        self.instance = instance
-        self.addrs    = {}  # Address => Count at Node (1 only for the present)
+
+    def delete(self):
+        self.unmap_all_addresses()
+        self.adapter.del_router(self.maskbit)
+        self.parent._free_maskbit(self.maskbit)
+        self.log(LOG_TRACE, "Node %s deleted" % self.id)
+
+
+    def set_next_hop(self, next_hop):
+        if self.id == next_hop.id:
+            return
+        if self.next_hop_router and self.next_hop_router.id == next_hop.id:
+            return
+        self.next_hop_router = next_hop
+        self.adapter.set_next_hop(self.maskbit, next_hop.maskbit)
+        self.log(LOG_TRACE, "Node %s next hop set: %s" % (self.id, 
next_hop.id))
+
+
+    def set_valid_origins(self, valid_origins):
+        if self.valid_origins == valid_origins:
+            return
+        self.valid_origins = valid_origins
+        vo_mb = [self.parent.nodes[N].maskbit for N in valid_origins]
+        self.adapter.set_valid_origins(self.maskbit, vo_mb)
+        self.log(LOG_TRACE, "Node %s valid origins: %r" % (self.id, 
valid_origins))
+
+
+    def remove_next_hop(self):
+        if self.next_hop_router:
+            self.next_hop_router = None
+            self.adapter.remove_next_hop(self.maskbit)
+            self.log(LOG_TRACE, "Node %s next hop removed" % self.id)
+
+
+    def is_neighbor(self):
+        return self.peer_link_id != None
+
+
+    def request_link_state(self):
+        """
+        Set the link-state-requested flag so we can send this node a link-state
+        request at the most opportune time.
+        """
+        self.need_ls_request = True
+
+
+    def link_state_requested(self):
+        """
+        Return True iff we need to request this node's link state AND the node 
is
+        reachable.  There's no point in sending it a request if we don't know 
how to
+        reach it.
+        """
+        if self.need_ls_request and (self.peer_link_id != None or 
self.next_hop_router != None):
+            self.need_ls_request = False
+            return True
+        return False
+
+
+    def mobile_address_request(self):
+        self.need_mobile_request = True
+
+
+    def mobile_address_requested(self):
+        if self.need_mobile_request and (self.peer_link_id != None or 
self.next_hop_router != None):
+            self.need_mobile_request = False
+            return True
+        return False
+
+
+    def map_address(self, addr):
+        self.mobile_addresses.append(addr)
+        self.adapter.map_destination(addr[0], addr[1:], self.maskbit)
+
+
+    def unmap_address(self, addr):
+        self.mobile_addresses.remove(addr)
+        self.adapter.unmap_destination(addr[0], addr[1:], self.maskbit)
+
+
+    def unmap_all_addresses(self):
+        self.mobile_address_sequence = 0
+        for addr in self.mobile_addresses:
+            self.unmap_address(addr)
+
+
+    def overwrite_addresses(self, addrs):
+        added   = []
+        deleted = []
+        for a in addrs:
+            if a not in self.mobile_addresses:
+                added.append(a)
+        for a in self.mobile_addresses:
+            if a not in addrs:
+                deleted.append(a)
+        for a in added:
+            self.map_address(a)
+        for a in deleted:
+            self.unmap_address(a)
 
 
     def update_instance(self, instance):
@@ -216,5 +467,10 @@ class RemoteNode(object):
             return False
         if self.instance == instance:
             return False
+
         self.instance = instance
+        self.link_state.del_all_peers()
+        self.unmap_all_addresses()
+        self.log(LOG_TRACE, "Node %s detected restart" % self.id)
         return True
+

Modified: qpid/dispatch/trunk/python/qpid_dispatch_internal/router/path.py
URL: 
http://svn.apache.org/viewvc/qpid/dispatch/trunk/python/qpid_dispatch_internal/router/path.py?rev=1653556&r1=1653555&r2=1653556&view=diff
==============================================================================
--- qpid/dispatch/trunk/python/qpid_dispatch_internal/router/path.py (original)
+++ qpid/dispatch/trunk/python/qpid_dispatch_internal/router/path.py Wed Jan 21 
15:22:57 2015
@@ -20,29 +20,15 @@
 
 class PathEngine(object):
     """
-    This module is responsible for computing the next-hop for every 
router/area in the domain
+    This module is responsible for computing the next-hop for every router in 
the domain
     based on the collection of link states that have been gathered.
     """
     def __init__(self, container):
         self.container = container
         self.id = self.container.id
-        self.area = self.container.area
-        self.recalculate = False
-        self.collection = None
 
 
-    def tick(self, now_unused):
-        if self.recalculate:
-            self.recalculate = False
-            self._calculate_routes()
-
-
-    def ls_collection_changed(self, collection):
-        self.recalculate = True
-        self.collection = collection
-
-
-    def _calculate_tree_from_root(self, root):
+    def _calculate_tree_from_root(self, root, collection):
         ##
         ## Make a copy of the current collection of link-states that contains
         ## a fake link-state for nodes that are known-peers but are not in the
@@ -50,7 +36,7 @@ class PathEngine(object):
         ## so we can trade link-state information with them.
         ##
         link_states = {}
-        for _id, ls in self.collection.items():
+        for _id, ls in collection.items():
             link_states[_id] = ls.peers
             for p in ls.peers:
                 if p not in link_states:
@@ -98,7 +84,7 @@ class PathEngine(object):
         return prev
 
 
-    def _calculate_valid_origins(self, nodeset):
+    def _calculate_valid_origins(self, nodeset, collection):
         ##
         ## Calculate the tree from each origin, determine the set of 
origins-per-dest
         ## for which the path from origin to dest passes through us.  This is 
the set
@@ -110,7 +96,7 @@ class PathEngine(object):
                 valid_origin[node] = []
 
         for root in valid_origin.keys():
-            prev  = self._calculate_tree_from_root(root)
+            prev  = self._calculate_tree_from_root(root, collection)
             nodes = prev.keys()
             while len(nodes) > 0:
                 u = nodes[0]
@@ -129,11 +115,11 @@ class PathEngine(object):
         return valid_origin
 
 
-    def _calculate_routes(self):
+    def calculate_routes(self, collection):
         ##
         ## Generate the shortest-path tree with the local node as root
         ##
-        prev  = self._calculate_tree_from_root(self.id)
+        prev  = self._calculate_tree_from_root(self.id, collection)
         nodes = prev.keys()
 
         ##
@@ -154,13 +140,12 @@ class PathEngine(object):
             for w in path:        # mark each node in the path as reachable 
via the next hop
                 next_hops[w] = u
 
-        self.container.next_hops_changed(next_hops)
-
         ##
         ## Calculate the valid origins for remote routers
         ##
-        valid_origin = self._calculate_valid_origins(prev.keys())
-        self.container.valid_origins_changed(valid_origin)
+        valid_origins = self._calculate_valid_origins(prev.keys(), collection)
+
+        return (next_hops, valid_origins)
 
 
 

Modified: qpid/dispatch/trunk/src/python_embedded.c
URL: 
http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/python_embedded.c?rev=1653556&r1=1653555&r2=1653556&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/python_embedded.c (original)
+++ qpid/dispatch/trunk/src/python_embedded.c Wed Jan 21 15:22:57 2015
@@ -52,9 +52,11 @@ void qd_python_initialize(qd_dispatch_t
     ilock = sys_mutex();
     if (python_pkgdir)
         dispatch_python_pkgdir = PyString_FromString(python_pkgdir);
-    Py_Initialize();
 
+    qd_python_lock_state_t ls = qd_python_lock();
+    Py_Initialize();
     qd_python_setup();
+    qd_python_unlock(ls);
 }
 
 

Modified: qpid/dispatch/trunk/src/router_node.c
URL: 
http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/router_node.c?rev=1653556&r1=1653555&r2=1653556&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/router_node.c (original)
+++ qpid/dispatch/trunk/src/router_node.c Wed Jan 21 15:22:57 2015
@@ -1449,8 +1449,9 @@ qd_router_t *qd_router(qd_dispatch_t *qd
     // locally later in the initialization sequence.
     //
     if (router->router_mode == QD_ROUTER_MODE_INTERIOR) {
-        router->router_addr = qd_router_register_address(qd, "qdrouter", 0, 
QD_SEMANTICS_ROUTER_CONTROL, false, 0);
-        router->hello_addr  = qd_router_register_address(qd, "qdhello", 0, 
QD_SEMANTICS_ROUTER_CONTROL, false, 0);
+        router->router_addr   = qd_router_register_address(qd, "qdrouter", 0, 
QD_SEMANTICS_ROUTER_CONTROL, false, 0);
+        router->routerma_addr = qd_router_register_address(qd, "qdrouter.ma", 
0, QD_SEMANTICS_DEFAULT, false, 0);
+        router->hello_addr    = qd_router_register_address(qd, "qdhello", 0, 
QD_SEMANTICS_ROUTER_CONTROL, false, 0);
     }
 
     //

Modified: qpid/dispatch/trunk/src/router_private.h
URL: 
http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/router_private.h?rev=1653556&r1=1653555&r2=1653556&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/router_private.h (original)
+++ qpid/dispatch/trunk/src/router_private.h Wed Jan 21 15:22:57 2015
@@ -219,6 +219,7 @@ struct qd_router_t {
     qd_address_list_t         addrs;
     qd_hash_t                *addr_hash;
     qd_address_t             *router_addr;
+    qd_address_t             *routerma_addr;
     qd_address_t             *hello_addr;
 
     qd_router_link_list_t     links;



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to