Author: tross
Date: Wed Jan 21 15:22:57 2015
New Revision: 1653556
URL: http://svn.apache.org/r1653556
Log:
DISPATCH-97 - Major refactor and redesign of the Python Router module.
- Removed the distinction between neighbor and remote routers.
Now, routers either have a link (neighbor) or a next-hop (remote).
- Added a startup-sequence field to detect rapid-restart of a router.
- Request messages are not sent until there is a route to the destination.
- Mobile-address updates are now broadcast on a separate address that does not
bypass valid-origin checks. Only one copy is delivered to each router in
the
network.
- Router logging has been divided into sub-protocols for better monitoring and
troubleshooting.
- A 6-node test configuration has been added.
Added:
qpid/dispatch/trunk/python/qpid_dispatch_internal/router/hello.py
- copied, changed from r1653545,
qpid/dispatch/trunk/python/qpid_dispatch_internal/router/neighbor.py
qpid/dispatch/trunk/tests/config-6/
qpid/dispatch/trunk/tests/config-6/A.conf
qpid/dispatch/trunk/tests/config-6/B.conf
qpid/dispatch/trunk/tests/config-6/C.conf
qpid/dispatch/trunk/tests/config-6/D.conf
qpid/dispatch/trunk/tests/config-6/X.conf
qpid/dispatch/trunk/tests/config-6/Y.conf
qpid/dispatch/trunk/tests/config-6/topology.txt
- copied, changed from r1650165,
qpid/dispatch/trunk/tests/config-3-linear/topology.txt
Removed:
qpid/dispatch/trunk/python/qpid_dispatch_internal/router/neighbor.py
qpid/dispatch/trunk/python/qpid_dispatch_internal/router/routing.py
Modified:
qpid/dispatch/trunk/include/qpid/dispatch/ctools.h
qpid/dispatch/trunk/python/qpid_dispatch/management/qdrouter.json
qpid/dispatch/trunk/python/qpid_dispatch_internal/router/data.py
qpid/dispatch/trunk/python/qpid_dispatch_internal/router/engine.py
qpid/dispatch/trunk/python/qpid_dispatch_internal/router/link.py
qpid/dispatch/trunk/python/qpid_dispatch_internal/router/mobile.py
qpid/dispatch/trunk/python/qpid_dispatch_internal/router/node.py
qpid/dispatch/trunk/python/qpid_dispatch_internal/router/path.py
qpid/dispatch/trunk/src/python_embedded.c
qpid/dispatch/trunk/src/router_node.c
qpid/dispatch/trunk/src/router_private.h
qpid/dispatch/trunk/src/router_pynode.c
qpid/dispatch/trunk/tests/config-2/A.conf
qpid/dispatch/trunk/tests/config-2/B.conf
qpid/dispatch/trunk/tests/config-3-linear/A.conf
qpid/dispatch/trunk/tests/config-3-linear/B.conf
qpid/dispatch/trunk/tests/config-3-linear/C.conf
qpid/dispatch/trunk/tests/router_engine_test.py
Modified: qpid/dispatch/trunk/include/qpid/dispatch/ctools.h
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/include/qpid/dispatch/ctools.h?rev=1653556&r1=1653555&r2=1653556&view=diff
==============================================================================
--- qpid/dispatch/trunk/include/qpid/dispatch/ctools.h (original)
+++ qpid/dispatch/trunk/include/qpid/dispatch/ctools.h Wed Jan 21 15:22:57 2015
@@ -54,7 +54,7 @@
*@pre ptr points to first element of deq
*@post ptr points to first element of deq that passes test, or 0. Test should
involve ptr.
*/
-#define DEQ_FIND(ptr, test) while((ptr) && !(test)) ptr = DEQ_NEXT(ptr);
+#define DEQ_FIND(ptr,test) while((ptr) && !(test)) ptr = DEQ_NEXT(ptr);
#define DEQ_INSERT_HEAD(d,i) \
do { \
Modified: qpid/dispatch/trunk/python/qpid_dispatch/management/qdrouter.json
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/python/qpid_dispatch/management/qdrouter.json?rev=1653556&r1=1653555&r2=1653556&view=diff
==============================================================================
--- qpid/dispatch/trunk/python/qpid_dispatch/management/qdrouter.json (original)
+++ qpid/dispatch/trunk/python/qpid_dispatch/management/qdrouter.json Wed Jan
21 15:22:57 2015
@@ -260,6 +260,9 @@
"module": {
"type":[
"ROUTER",
+ "ROUTER_HELLO",
+ "ROUTER_LS",
+ "ROUTER_MA",
"MESSAGE",
"SERVER",
"AGENT",
Modified: qpid/dispatch/trunk/python/qpid_dispatch_internal/router/data.py
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/python/qpid_dispatch_internal/router/data.py?rev=1653556&r1=1653555&r2=1653556&view=diff
==============================================================================
--- qpid/dispatch/trunk/python/qpid_dispatch_internal/router/data.py (original)
+++ qpid/dispatch/trunk/python/qpid_dispatch_internal/router/data.py Wed Jan 21
15:22:57 2015
@@ -46,16 +46,16 @@ class LinkState(object):
The link-state of a single router. The link state consists of a list of
neighbor routers reachable from
the reporting router. The link-state-sequence number is incremented each
time the link state changes.
"""
- def __init__(self, body, _id=None, _area=None, _ls_seq=None, _peers=None):
+ def __init__(self, body, _id=None, _ls_seq=None, _peers=None):
self.last_seen = 0
if body:
self.id = getMandatory(body, 'id', str)
- self.area = getMandatory(body, 'area', str)
+ self.area = '0'
self.ls_seq = getMandatory(body, 'ls_seq', long)
self.peers = getMandatory(body, 'peers', list)
else:
self.id = _id
- self.area = _area
+ self.area = '0'
self.ls_seq = long(_ls_seq)
self.peers = _peers
@@ -87,6 +87,9 @@ class LinkState(object):
def has_peers(self):
return len(self.peers) > 0
+ def is_peer(self, _id):
+ return _id in self.peers
+
def bump_sequence(self):
self.ls_seq += 1
@@ -98,15 +101,15 @@ class MessageHELLO(object):
This message is used by directly connected routers to determine with whom
they have
bidirectional connectivity.
"""
- def __init__(self, body, _id=None, _area=None, _seen_peers=None,
_instance=long(0)):
+ def __init__(self, body, _id=None, _seen_peers=None, _instance=long(0)):
if body:
self.id = getMandatory(body, 'id', str)
- self.area = getMandatory(body, 'area', str)
+ self.area = '0'
self.seen_peers = getMandatory(body, 'seen', list)
self.instance = getOptional(body, 'instance', 0, long)
else:
self.id = _id
- self.area = _area
+ self.area = '0'
self.seen_peers = _seen_peers
self.instance = _instance
@@ -133,16 +136,16 @@ class MessageRA(object):
This message is sent periodically to indicate the originating router's
sequence numbers
for link-state and mobile-address-state.
"""
- def __init__(self, body, _id=None, _area=None, _ls_seq=None,
_mobile_seq=None, _instance=long(0)):
+ def __init__(self, body, _id=None, _ls_seq=None, _mobile_seq=None,
_instance=long(0)):
if body:
self.id = getMandatory(body, 'id', str)
- self.area = getMandatory(body, 'area', str)
+ self.area = '0'
self.ls_seq = getMandatory(body, 'ls_seq', long)
self.mobile_seq = getMandatory(body, 'mobile_seq', long)
self.instance = getOptional(body, 'instance', 0, long)
else:
self.id = _id
- self.area = _area
+ self.area = '0'
self.ls_seq = long(_ls_seq)
self.mobile_seq = long(_mobile_seq)
self.instance = _instance
@@ -165,16 +168,16 @@ class MessageRA(object):
class MessageLSU(object):
"""
"""
- def __init__(self, body, _id=None, _area=None, _ls_seq=None, _ls=None,
_instance=long(0)):
+ def __init__(self, body, _id=None, _ls_seq=None, _ls=None,
_instance=long(0)):
if body:
self.id = getMandatory(body, 'id', str)
- self.area = getMandatory(body, 'area', str)
+ self.area = '0'
self.ls_seq = getMandatory(body, 'ls_seq', long)
self.ls = LinkState(getMandatory(body, 'ls', dict))
self.instance = getOptional(body, 'instance', 0, long)
else:
self.id = _id
- self.area = _area
+ self.area = '0'
self.ls_seq = long(_ls_seq)
self.ls = _ls
self.instance = _instance
@@ -197,13 +200,13 @@ class MessageLSU(object):
class MessageLSR(object):
"""
"""
- def __init__(self, body, _id=None, _area=None):
+ def __init__(self, body, _id=None):
if body:
self.id = getMandatory(body, 'id', str)
- self.area = getMandatory(body, 'area', str)
+ self.area = '0'
else:
self.id = _id
- self.area = _area
+ self.area = '0'
def get_opcode(self):
return 'LSR'
@@ -219,17 +222,17 @@ class MessageLSR(object):
class MessageMAU(object):
"""
"""
- def __init__(self, body, _id=None, _area=None, _seq=None, _add_list=None,
_del_list=None, _exist_list=None):
+ def __init__(self, body, _id=None, _seq=None, _add_list=None,
_del_list=None, _exist_list=None):
if body:
self.id = getMandatory(body, 'id', str)
- self.area = getMandatory(body, 'area', str)
+ self.area = '0'
self.mobile_seq = getMandatory(body, 'mobile_seq', long)
self.add_list = getOptional(body, 'add', None, list)
self.del_list = getOptional(body, 'del', None, list)
self.exist_list = getOptional(body, 'exist', None, list)
else:
self.id = _id
- self.area = _area
+ self.area = '0'
self.mobile_seq = long(_seq)
self.add_list = _add_list
self.del_list = _del_list
@@ -261,14 +264,14 @@ class MessageMAU(object):
class MessageMAR(object):
"""
"""
- def __init__(self, body, _id=None, _area=None, _have_seq=None):
+ def __init__(self, body, _id=None, _have_seq=None):
if body:
self.id = getMandatory(body, 'id', str)
- self.area = getMandatory(body, 'area', str)
+ self.area = '0'
self.have_seq = getMandatory(body, 'have_seq', long)
else:
self.id = _id
- self.area = _area
+ self.area = '0'
self.have_seq = long(_have_seq)
def get_opcode(self):
Modified: qpid/dispatch/trunk/python/qpid_dispatch_internal/router/engine.py
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/python/qpid_dispatch_internal/router/engine.py?rev=1653556&r1=1653555&r2=1653556&view=diff
==============================================================================
--- qpid/dispatch/trunk/python/qpid_dispatch_internal/router/engine.py
(original)
+++ qpid/dispatch/trunk/python/qpid_dispatch_internal/router/engine.py Wed Jan
21 15:22:57 2015
@@ -18,11 +18,10 @@
#
from data import MessageHELLO, MessageRA, MessageLSU, MessageMAU, MessageMAR,
MessageLSR
-from neighbor import NeighborEngine
+from hello import HelloProtocol
from link import LinkStateEngine
from path import PathEngine
from mobile import MobileAddressEngine
-from routing import RoutingTableEngine
from node import NodeTracker
from message import Message
@@ -40,7 +39,7 @@ class RouterEngine:
"""
"""
- def __init__(self, router_adapter, router_id, area, max_routers,
config_override={}):
+ def __init__(self, router_adapter, router_id, area_unused, max_routers,
config_override={}):
"""
Initialize an instance of a router for a domain.
"""
@@ -49,25 +48,28 @@ class RouterEngine:
##
self.domain = "domain"
self.router_adapter = router_adapter
- self.log_adapter = LogAdapter("ROUTER")
+ self._config = None # Not yet loaded
+ self._log_hello = LogAdapter("ROUTER_HELLO")
+ self._log_ls = LogAdapter("ROUTER_LS")
+ self._log_ma = LogAdapter("ROUTER_MA")
+ self._log_general = LogAdapter("ROUTER")
self.io_adapter = [IoAdapter(self.receive, "qdrouter"),
+ IoAdapter(self.receive, "qdrouter.ma"),
IoAdapter(self.receive, "qdhello")]
self.max_routers = max_routers
self.id = router_id
- self.area = area
self.instance = long(time.time())
- self.log(LOG_INFO, "Router Engine Instantiated: area=%s id=%s
instance=%d max_routers=%d" %
- (self.area, self.id, self.instance, self.max_routers))
- self._config = None # Not yet loaded
+ self.log(LOG_INFO, "Router Engine Instantiated: id=%s instance=%d
max_routers=%d" %
+ (self.id, self.instance, self.max_routers))
+
##
## Launch the sub-module engines
##
self.node_tracker = NodeTracker(self, self.max_routers)
- self.neighbor_engine = NeighborEngine(self)
+ self.hello_protocol = HelloProtocol(self, self.node_tracker)
self.link_state_engine = LinkStateEngine(self)
self.path_engine = PathEngine(self)
self.mobile_address_engine = MobileAddressEngine(self,
self.node_tracker)
- self.routing_table_engine = RoutingTableEngine(self,
self.node_tracker)
##========================================================================================
@@ -90,12 +92,10 @@ class RouterEngine:
"""
"""
try:
- if addr.find('Mtemp.') == 0: ## This is a temporary measure until
dynamic is added to Messenger
- return
if addr.find('M') == 0:
self.mobile_address_engine.add_local_address(addr[1:])
except Exception, e:
- self.log(LOG_ERROR, "Exception in new-address processing:
exception=%r" % e)
+ self.log_ma(LOG_ERROR, "Exception in new-address processing:
exception=%r" % e)
exc_type, exc_value, exc_traceback = sys.exc_info()
traceback.print_tb(exc_traceback)
@@ -104,12 +104,10 @@ class RouterEngine:
"""
"""
try:
- if addr.find('Mtemp.') == 0:
- return
if addr.find('M') == 0:
self.mobile_address_engine.del_local_address(addr[1:])
except Exception, e:
- self.log(LOG_ERROR, "Exception in del-address processing:
exception=%r" % e)
+ self.log_ma(LOG_ERROR, "Exception in del-address processing:
exception=%r" % e)
exc_type, exc_value, exc_traceback = sys.exc_info()
traceback.print_tb(exc_traceback)
@@ -117,8 +115,7 @@ class RouterEngine:
def linkLost(self, link_id):
"""
"""
- self.log(LOG_INFO, "Router Link Lost - link_id=%d" % link_id)
- self.neighbor_engine.linkLost(link_id)
+ self.node_tracker.link_lost(link_id)
def handleTimerTick(self):
@@ -126,11 +123,8 @@ class RouterEngine:
"""
try:
now = time.time()
- self.neighbor_engine.tick(now)
+ self.hello_protocol.tick(now)
self.link_state_engine.tick(now)
- self.path_engine.tick(now)
- self.mobile_address_engine.tick(now)
- self.routing_table_engine.tick(now)
self.node_tracker.tick(now)
except Exception, e:
self.log(LOG_ERROR, "Exception in timer processing: exception=%r"
% e)
@@ -145,33 +139,32 @@ class RouterEngine:
now = time.time()
if opcode == 'HELLO':
msg = MessageHELLO(body)
- self.log(LOG_TRACE, "RCVD: %r" % msg)
- self.neighbor_engine.handle_hello(msg, now, link_id)
+ self.log_hello(LOG_TRACE, "RCVD: %r" % msg)
+ self.hello_protocol.handle_hello(msg, now, link_id)
elif opcode == 'RA':
msg = MessageRA(body)
- self.log(LOG_TRACE, "RCVD: %r" % msg)
+ self.log_ls(LOG_TRACE, "RCVD: %r" % msg)
self.link_state_engine.handle_ra(msg, now)
- self.mobile_address_engine.handle_ra(msg, now)
elif opcode == 'LSU':
msg = MessageLSU(body)
- self.log(LOG_DEBUG, "RCVD: %r" % msg)
+ self.log_ls(LOG_TRACE, "RCVD: %r" % msg)
self.link_state_engine.handle_lsu(msg, now)
elif opcode == 'LSR':
msg = MessageLSR(body)
- self.log(LOG_DEBUG, "RCVD: %r" % msg)
+ self.log_ls(LOG_TRACE, "RCVD: %r" % msg)
self.link_state_engine.handle_lsr(msg, now)
elif opcode == 'MAU':
msg = MessageMAU(body)
- self.log(LOG_DEBUG, "RCVD: %r" % msg)
+ self.log_ma(LOG_TRACE, "RCVD: %r" % msg)
self.mobile_address_engine.handle_mau(msg, now)
elif opcode == 'MAR':
msg = MessageMAR(body)
- self.log(LOG_DEBUG, "RCVD: %r" % msg)
+ self.log_ma(LOG_TRACE, "RCVD: %r" % msg)
self.mobile_address_engine.handle_mar(msg, now)
except Exception, e:
@@ -203,7 +196,6 @@ class RouterEngine:
'next-hops' : "Next hops to each known router"
}
if kind == 'link-state' : return
self.neighbor_engine.link_state.to_dict()
- if kind == 'next-hops' : return
self.routing_table_engine.next_hops
if kind == 'link-state-set' :
copy = {}
for _id,_ls in self.link_state_engine.collection.items():
@@ -221,7 +213,31 @@ class RouterEngine:
Emit a log message to the host's event log
"""
info = traceback.extract_stack(limit=2)[0] # Caller frame info
- self.log_adapter.log(level, text, info[0], info[1])
+ self._log_general.log(level, text, info[0], info[1])
+
+
+ def log_hello(self, level, text):
+ """
+ Emit a log message to the host's event log
+ """
+ info = traceback.extract_stack(limit=2)[0] # Caller frame info
+ self._log_hello.log(level, text, info[0], info[1])
+
+
+ def log_ls(self, level, text):
+ """
+ Emit a log message to the host's event log
+ """
+ info = traceback.extract_stack(limit=2)[0] # Caller frame info
+ self._log_ls.log(level, text, info[0], info[1])
+
+
+ def log_ma(self, level, text):
+ """
+ Emit a log message to the host's event log
+ """
+ info = traceback.extract_stack(limit=2)[0] # Caller frame info
+ self._log_ma.log(level, text, info[0], info[1])
def send(self, dest, msg):
@@ -230,7 +246,6 @@ class RouterEngine:
"""
app_props = {'opcode' : msg.get_opcode() }
self.io_adapter[0].send(Message(address=dest, properties=app_props,
body=msg.to_dict()))
- self.log(LOG_TRACE, "SENT: %r dest=%s" % (msg, dest))
def node_updated(self, addr, reachable, neighbor):
@@ -238,71 +253,3 @@ class RouterEngine:
"""
self.router_adapter(addr, reachable, neighbor)
-
-
##========================================================================================
- ## Interconnect between the Sub-Modules
-
##========================================================================================
- def local_link_state_changed(self, link_state):
- self.log(LOG_DEBUG, "Event: local_link_state_changed: %r" % link_state)
- self.link_state_engine.new_local_link_state(link_state)
-
- def ls_collection_changed(self, collection):
- self.log(LOG_DEBUG, "Event: ls_collection_changed: %r" % collection)
- self.path_engine.ls_collection_changed(collection)
-
- def next_hops_changed(self, next_hop_table):
- self.log(LOG_DEBUG, "Event: next_hops_changed: %r" % next_hop_table)
- self.routing_table_engine.next_hops_changed(next_hop_table)
-
- def valid_origins_changed(self, valid_origins):
- self.log(LOG_DEBUG, "Event: valid_origins_changed: %r" % valid_origins)
- self.routing_table_engine.valid_origins_changed(valid_origins)
-
- def mobile_sequence_changed(self, mobile_seq):
- self.log(LOG_DEBUG, "Event: mobile_sequence_changed: %d" % mobile_seq)
- self.link_state_engine.set_mobile_sequence(mobile_seq)
-
- def get_next_hops(self):
- return self.routing_table_engine.get_next_hops()
-
- def new_neighbor(self, rid, link_id, instance):
- self.log(LOG_DEBUG, "Event: new_neighbor: id=%s link_id=%d inst=%r" %
(rid, link_id, instance))
- self.node_tracker.new_neighbor(rid, link_id, instance)
- self.link_state_engine.new_neighbor(rid)
-
- def lost_neighbor(self, rid):
- self.log(LOG_DEBUG, "Event: lost_neighbor: id=%s" % rid)
- self.node_tracker.lost_neighbor(rid)
-
- def new_node(self, rid, instance):
- self.log(LOG_DEBUG, "Event: new_node: id=%s inst=%r" % (rid, instance))
- self.node_tracker.new_node(rid, instance)
-
- def lost_node(self, rid):
- self.log(LOG_DEBUG, "Event: lost_node: id=%s" % rid)
- self.node_tracker.lost_node(rid)
-
- def touch_node(self, rid, instance):
- return self.node_tracker.touch_node(rid, instance)
-
- def add_neighbor_router(self, address, router_bit, link_bit):
- self.log(LOG_DEBUG, "Event: add_neighbor_router: address=%s,
router_bit=%d, link_bit=%d" % \
- (address, router_bit, link_bit))
- self.router_adapter.add_neighbor_router(address, router_bit, link_bit)
-
- def del_neighbor_router(self, router_id, router_bit):
- self.log(LOG_DEBUG, "Event: del_neighbor_router: router_bit=%d" %
router_bit)
- self.link_state_engine.purge_remote(router_id)
- self.mobile_address_engine.purge_remote(router_id)
- self.router_adapter.del_neighbor_router(router_bit)
-
- def add_remote_router(self, address, router_bit):
- self.log(LOG_DEBUG, "Event: add_remote_router: address=%s,
router_bit=%d" % (address, router_bit))
- self.router_adapter.add_remote_router(address, router_bit)
-
- def del_remote_router(self, router_id, router_bit):
- self.log(LOG_DEBUG, "Event: del_remote_router: router_bit=%d" %
router_bit)
- self.link_state_engine.purge_remote(router_id)
- self.mobile_address_engine.purge_remote(router_id)
- self.router_adapter.del_remote_router(router_bit)
-
Copied: qpid/dispatch/trunk/python/qpid_dispatch_internal/router/hello.py (from
r1653545, qpid/dispatch/trunk/python/qpid_dispatch_internal/router/neighbor.py)
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/python/qpid_dispatch_internal/router/hello.py?p2=qpid/dispatch/trunk/python/qpid_dispatch_internal/router/hello.py&p1=qpid/dispatch/trunk/python/qpid_dispatch_internal/router/neighbor.py&r1=1653545&r2=1653556&rev=1653556&view=diff
==============================================================================
--- qpid/dispatch/trunk/python/qpid_dispatch_internal/router/neighbor.py
(original)
+++ qpid/dispatch/trunk/python/qpid_dispatch_internal/router/hello.py Wed Jan
21 15:22:57 2015
@@ -18,38 +18,29 @@
#
from data import LinkState, MessageHELLO
-from dispatch import LOG_INFO
+from dispatch import LOG_INFO, LOG_TRACE
-class NeighborEngine(object):
+class HelloProtocol(object):
"""
- This module is responsible for maintaining this router's link-state. It
runs the HELLO protocol
- with the router's neighbors and notifies outbound when the list of
neighbors-in-good-standing (the
- link-state) changes.
+ This module is responsible for running the HELLO protocol.
"""
- def __init__(self, container):
- self.container = container
- self.id = self.container.id
- self.area = self.container.area
+ def __init__(self, container, node_tracker):
+ self.container = container
+ self.node_tracker = node_tracker
+ self.id = self.container.id
self.last_hello_time = 0.0
- self.hello_interval = container.config.helloInterval
- self.hello_max_age = container.config.helloMaxAge
- self.hellos = {}
- self.link_state_changed = False
- self.link_state = LinkState(None, self.id, self.area, 0, [])
+ self.hello_interval = container.config.helloInterval
+ self.hello_max_age = container.config.helloMaxAge
+ self.hellos = {}
def tick(self, now):
self._expire_hellos(now)
-
if now - self.last_hello_time >= self.hello_interval:
self.last_hello_time = now
- self.container.send('amqp:/_local/qdhello',
- MessageHELLO(None, self.id, self.area,
self.hellos.keys(), self.container.instance))
-
- if self.link_state_changed:
- self.link_state_changed = False
- self.link_state.bump_sequence()
- self.container.local_link_state_changed(self.link_state)
+ msg = MessageHELLO(None, self.id, self.hellos.keys(),
self.container.instance)
+ self.container.send('amqp:/_local/qdhello', msg)
+ self.container.log_hello(LOG_TRACE, "SENT: %r" % msg)
def handle_hello(self, msg, now, link_id):
@@ -57,27 +48,16 @@ class NeighborEngine(object):
return
self.hellos[msg.id] = now
if msg.is_seen(self.id):
- if self.link_state.add_peer(msg.id):
- self.link_state_changed = True
- self.container.new_neighbor(msg.id, link_id, msg.instance)
- self.container.log(LOG_INFO, "New neighbor established: %s on
link: %d" % (msg.id, link_id))
-
- def linkLost(self, link_id):
- node_id = self.container.node_tracker.link_id_to_node_id(link_id)
- if node_id:
- self._delete_neighbor(node_id)
-
- def _delete_neighbor(self, key):
- self.hellos.pop(key)
- if self.link_state.del_peer(key):
- self.link_state_changed = True
- self.container.lost_neighbor(key)
- self.container.log(LOG_INFO, "Neighbor lost: %s" % key)
+ self.node_tracker.neighbor_refresh(msg.id, msg.instance, link_id,
now)
+
def _expire_hellos(self, now):
- to_delete = []
+ """
+ Expire local records of received hellos. This is not involved in the
+ expiration of neighbor status for routers.
+ """
for key, last_seen in self.hellos.items():
if now - last_seen > self.hello_max_age:
- to_delete.append(key)
- for key in to_delete:
- self._delete_neighbor(key)
+ self.hellos.pop(key)
+ self.container.log_hello(LOG_INFO, "HELLO peer expired: %s" %
key)
+
Modified: qpid/dispatch/trunk/python/qpid_dispatch_internal/router/link.py
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/python/qpid_dispatch_internal/router/link.py?rev=1653556&r1=1653555&r2=1653556&view=diff
==============================================================================
--- qpid/dispatch/trunk/python/qpid_dispatch_internal/router/link.py (original)
+++ qpid/dispatch/trunk/python/qpid_dispatch_internal/router/link.py Wed Jan 21
15:22:57 2015
@@ -18,135 +18,61 @@
#
from data import MessageRA, MessageLSU, MessageLSR
-from dispatch import LOG_INFO
+from dispatch import LOG_INFO, LOG_TRACE
class LinkStateEngine(object):
"""
- This module is responsible for running the Link State protocol and
maintaining the set
- of link states that are gathered from the domain. It notifies outbound
when changes to
- the link-state-collection are detected.
+ This module is responsible for running the Link State protocol.
"""
def __init__(self, container):
self.container = container
+ self.node_tracker = container.node_tracker
self.id = self.container.id
- self.area = self.container.area
self.ra_interval = self.container.config.raInterval
- self.remote_ls_max_age = self.container.config.remoteLsMaxAge
self.last_ra_time = 0
- self.collection = {}
- self.collection_changed = False
- self.mobile_seq = 0
- self.needed_lsrs = {}
+ self.mobile_seq = 0
- def tick(self, now):
- self._expire_ls(now)
- self._send_lsrs()
+ def set_mobile_seq(self, mobile_seq):
+ self.mobile_seq = mobile_seq
- if now - self.last_ra_time >= self.ra_interval:
- self.last_ra_time = now
- self._send_ra()
- if self.collection_changed:
- self.collection_changed = False
- self.container.log(LOG_INFO, "New Link-State Collection:")
- for a,b in self.collection.items():
- self.container.log(LOG_INFO, " %s => %r" % (a, b.peers))
- self.container.ls_collection_changed(self.collection)
+ def tick(self, now):
+ if now - self.last_ra_time >= self.ra_interval:
+ self.send_ra(now)
def handle_ra(self, msg, now):
if msg.id == self.id:
return
- if msg.id in self.collection:
- ls = self.collection[msg.id]
- ls.last_seen = now
- if ls.ls_seq < msg.ls_seq:
- self.needed_lsrs[(msg.area, msg.id)] = None
- if self.container.touch_node(msg.id, msg.instance):
- self.needed_lsrs[(msg.area, msg.id)] = None
- else:
- self.needed_lsrs[(msg.area, msg.id)] = None
+ self.node_tracker.ra_received(msg.id, msg.ls_seq, msg.mobile_seq,
msg.instance, now)
def handle_lsu(self, msg, now):
if msg.id == self.id:
return
- if msg.id in self.collection:
- ls = self.collection[msg.id]
- if ls.ls_seq < msg.ls_seq:
- ls = msg.ls
- self.collection[msg.id] = ls
- self.collection_changed = True
- ls.last_seen = now
- else:
- ls = msg.ls
- self.collection[msg.id] = ls
- self.collection_changed = True
- ls.last_seen = now
- self.container.new_node(msg.id, msg.instance)
- self.container.log(LOG_INFO, "Learned link-state from new router:
%s" % msg.id)
- # Schedule LSRs for any routers referenced in this LS that we don't
know about
- for _id in msg.ls.peers:
- if _id not in self.collection:
- self.container.new_node(_id, None)
- self.needed_lsrs[(msg.area, _id)] = None
+ self.node_tracker.link_state_received(msg.id, msg.ls, msg.instance,
now)
def handle_lsr(self, msg, now):
if msg.id == self.id:
return
- if self.id not in self.collection:
- self.needed_lsrs[(msg.area, msg.id)] = None
- return
- my_ls = self.collection[self.id]
- self.container.send('amqp:/_topo/%s/%s/qdrouter' % (msg.area, msg.id),
- MessageLSU(None, self.id, self.area, my_ls.ls_seq,
my_ls, self.container.instance))
-
-
- def new_neighbor(self, _id):
- self.needed_lsrs[(self.area, _id)] = None
-
-
- def new_local_link_state(self, link_state):
- self.collection[self.id] = link_state
- self.collection_changed = True
- self._send_ra()
-
-
- def set_mobile_sequence(self, seq):
- self.mobile_seq = seq
-
-
- def get_collection(self):
- return self.collection
-
-
- def purge_remote(self, _id):
- try:
- ls = self.collection[_id]
- ls.del_all_peers()
- except:
- pass
-
-
- def _expire_ls(self, now):
- for key, ls in self.collection.items():
- if key != self.id and ls.has_peers() and now - ls.last_seen >
self.remote_ls_max_age:
- ls.del_all_peers()
- self.collection_changed = True
- self.container.log(LOG_INFO, "Expired link-state from router:
%s" % key)
-
-
- def _send_lsrs(self):
- for (_area, _id) in self.needed_lsrs.keys():
- self.container.send('amqp:/_topo/%s/%s/qdrouter' % (_area, _id),
MessageLSR(None, self.id, self.area))
- self.needed_lsrs = {}
-
-
- def _send_ra(self):
- ls_seq = 0
- if self.id in self.collection:
- ls_seq = self.collection[self.id].ls_seq
- self.container.send('amqp:/_topo/%s/all/qdrouter' % self.area,
- MessageRA(None, self.id, self.area, ls_seq,
self.mobile_seq, self.container.instance))
+ self.node_tracker.router_learned(msg.id)
+ my_ls = self.node_tracker.link_state
+ smsg = MessageLSU(None, self.id, my_ls.ls_seq, my_ls,
self.container.instance)
+ self.container.send('amqp:/_topo/%s/%s/qdrouter' % (msg.area, msg.id),
smsg)
+ self.container.log_ls(LOG_TRACE, "SENT: %r" % smsg)
+
+
+ def send_lsr(self, _id):
+ msg = MessageLSR(None, self.id)
+ self.container.send('amqp:/_topo/0/%s/qdrouter' % _id, msg)
+ self.container.log_ls(LOG_TRACE, "SENT: %r to: %s" % (msg, _id))
+
+
+ def send_ra(self, now):
+ self.last_ra_time = now
+ ls_seq = self.node_tracker.link_state.ls_seq
+ msg = MessageRA(None, self.id, ls_seq, self.mobile_seq,
self.container.instance)
+ self.container.send('amqp:/_topo/0/all/qdrouter', msg)
+ self.container.log_ls(LOG_TRACE, "SENT: %r" % msg)
Modified: qpid/dispatch/trunk/python/qpid_dispatch_internal/router/mobile.py
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/python/qpid_dispatch_internal/router/mobile.py?rev=1653556&r1=1653555&r2=1653556&view=diff
==============================================================================
--- qpid/dispatch/trunk/python/qpid_dispatch_internal/router/mobile.py
(original)
+++ qpid/dispatch/trunk/python/qpid_dispatch_internal/router/mobile.py Wed Jan
21 15:22:57 2015
@@ -18,7 +18,9 @@
#
from data import MessageMAR, MessageMAU
-from dispatch import LOG_DEBUG
+from dispatch import LOG_DEBUG, LOG_TRACE
+
+MAX_KEPT_DELTAS = 10
class MobileAddressEngine(object):
"""
@@ -28,38 +30,37 @@ class MobileAddressEngine(object):
is directly bound.
"""
def __init__(self, container, node_tracker):
- self.container = container
- self.node_tracker = node_tracker
- self.id = self.container.id
- self.area = self.container.area
- self.mobile_addr_max_age = self.container.config.mobileAddrMaxAge
- self.mobile_seq = 0
- self.local_addrs = []
- self.added_addrs = []
+ self.container = container
+ self.node_tracker = node_tracker
+ self.id = self.container.id
+ self.mobile_seq = 0
+ self.local_addrs = []
+ self.added_addrs = []
self.deleted_addrs = []
- self.remote_lists = {} # map router_id => (sequence, list of
addrs)
- self.remote_last_seen = {} # map router_id => time of last seen
advertisement/update
- self.needed_mars = {}
+ self.sent_deltas = {}
def tick(self, now):
- self._expire_remotes(now)
- self._send_mars()
-
##
## If local addrs have changed, collect the changes and send a MAU
with the diffs
## Note: it is important that the differential-MAU be sent before a RA
is sent
##
if len(self.added_addrs) > 0 or len(self.deleted_addrs) > 0:
self.mobile_seq += 1
- self.container.send('amqp:/_topo/%s/all/qdrouter' % self.area,
- MessageMAU(None, self.id, self.area,
self.mobile_seq, self.added_addrs, self.deleted_addrs))
+ msg = MessageMAU(None, self.id, self.mobile_seq, self.added_addrs,
self.deleted_addrs)
+
+ self.sent_deltas[self.mobile_seq] = msg
+ if len(self.sent_deltas) > MAX_KEPT_DELTAS:
+ self.sent_deltas.pop(self.mobile_seq - MAX_KEPT_DELTAS)
+
+ self.container.send('amqp:/_topo/0/all/qdrouter.ma', msg)
+ self.container.log_ma(LOG_TRACE, "SENT: %r" % msg)
self.local_addrs.extend(self.added_addrs)
for addr in self.deleted_addrs:
self.local_addrs.remove(addr)
- self.added_addrs = []
+ self.added_addrs = []
self.deleted_addrs = []
- self.container.mobile_sequence_changed(self.mobile_seq)
+ return self.mobile_seq
def add_local_address(self, addr):
@@ -84,22 +85,6 @@ class MobileAddressEngine(object):
self.added_addrs.remove(addr)
- def handle_ra(self, msg, now):
- if msg.id == self.id:
- return
-
- if msg.mobile_seq == 0:
- return
-
- if msg.id in self.remote_lists:
- _seq, _list = self.remote_lists[msg.id]
- self.remote_last_seen[msg.id] = now
- if _seq < msg.mobile_seq:
- self.needed_mars[(msg.id, msg.area, _seq)] = None
- else:
- self.needed_mars[(msg.id, msg.area, 0)] = None
-
-
def handle_mau(self, msg, now):
##
## If the MAU is differential, we can only use it if its sequence is
exactly one greater
@@ -109,87 +94,69 @@ class MobileAddressEngine(object):
##
if msg.id == self.id:
return
+ node = self.node_tracker.router_node(msg.id)
if msg.exist_list != None:
##
## Absolute MAU
##
- if msg.id in self.remote_lists:
- _seq, _list = self.remote_lists[msg.id]
- if _seq >= msg.mobile_seq: # ignore duplicates
- return
- self.remote_lists[msg.id] = (msg.mobile_seq, msg.exist_list)
- self.remote_last_seen[msg.id] = now
- (add_list, del_list) =
self.node_tracker.overwrite_addresses(msg.id, msg.exist_list)
- self._activate_remotes(msg.id, add_list, del_list)
+ if msg.mobile_seq == node.mobile_address_sequence:
+ return
+ node.mobile_address_sequence = msg.mobile_seq
+ node.overwrite_addresses(msg.exist_list)
else:
##
## Differential MAU
##
- if msg.id in self.remote_lists:
- _seq, _list = self.remote_lists[msg.id]
- if _seq == msg.mobile_seq: # ignore duplicates
- return
- self.remote_last_seen[msg.id] = now
- if _seq + 1 == msg.mobile_seq:
- ##
- ## This is one greater than our stored value, incorporate
the deltas
- ##
- if msg.add_list and msg.add_list.__class__ == list:
- _list.extend(msg.add_list)
- if msg.del_list and msg.del_list.__class__ == list:
- for addr in msg.del_list:
- _list.remove(addr)
- self.remote_lists[msg.id] = (msg.mobile_seq, _list)
- if msg.add_list != None:
- self.node_tracker.add_addresses(msg.id, msg.add_list)
- if msg.del_list != None:
- self.node_tracker.del_addresses(msg.id, msg.del_list)
- self._activate_remotes(msg.id, msg.add_list, msg.del_list)
- else:
- self.needed_mars[(msg.id, msg.area, _seq)] = None
+ if node.mobile_address_sequence + 1 == msg.mobile_seq:
+ ##
+ ## This message represents the next expected sequence,
incorporate the deltas
+ ##
+ node.mobile_address_sequence += 1
+ for a in msg.add_list:
+ node.map_address(a)
+ for a in msg.del_list:
+ node.unmap_address(a)
+
+ elif node.mobile_address_sequence == msg.mobile_seq:
+ ##
+ ## Ignore duplicates
+ ##
+ return
+
else:
- self.needed_mars[(msg.id, msg.area, 0)] = None
+ ##
+ ## This is an out-of-sequence delta. Don't use it. Schedule
a MAR to
+ ## get back on track.
+ ##
+ node.mobile_address_request()
def handle_mar(self, msg, now):
if msg.id == self.id:
return
- if msg.have_seq < self.mobile_seq:
- self.container.send('amqp:/_topo/%s/%s/qdrouter' % (msg.area,
msg.id),
- MessageMAU(None, self.id, self.area,
self.mobile_seq, None, None, self.local_addrs))
-
-
- def purge_remote(self, _id):
- try:
- (add_list, del_list) = self.node_tracker.overwrite_addresses(_id,
[])
- self._activate_remotes(_id, add_list, del_list)
- self.remote_lists.pop(_id)
- self.remote_last_seen.pop(_id)
- self.container.log(LOG_DEBUG, "Purged remote records for node: %s"
% _id)
- except:
- pass
-
-
- def _expire_remotes(self, now):
- for _id, t in self.remote_last_seen.items():
- if now - t > self.mobile_addr_max_age:
- self.remote_lists.pop(_id)
- self.remote_last_seen.pop(_id)
- self.container.log(LOG_DEBUG, "Expired remote mobile addresses
on node: %s" % _id)
-
-
- def _send_mars(self):
- for _id, _area, _seq in self.needed_mars.keys():
- self.container.send('amqp:/_topo/%s/%s/qdrouter' % (_area, _id),
MessageMAR(None, self.id, self.area, _seq))
- self.needed_mars = {}
-
-
- def _activate_remotes(self, _id, added, deleted):
- bit = self.node_tracker.maskbit_for_node(_id)
- if added != None:
- for a in added:
- self.container.router_adapter.map_destination(a[0], a[1:], bit)
- if deleted != None:
- for d in deleted:
- self.container.router_adapter.unmap_destination(d[0], d[1:],
bit)
+ if msg.have_seq == self.mobile_seq:
+ return
+ if self.mobile_seq - (msg.have_seq + 1) < len(self.sent_deltas):
+ ##
+ ## We can catch the peer up with a series of stored differential
updates
+ ##
+ for s in range(msg.have_seq + 1, self.mobile_seq + 1):
+ self.container.send('amqp:/_topo/0/%s/qdrouter.ma' % msg.id,
self.sent_deltas[s])
+ self.container.log_ma(LOG_TRACE, "SENT: %r" %
self.sent_deltas[s])
+ return
+
+ ##
+ ## The peer needs to be sent an absolute update with the whole address
list
+ ##
+ smsg = MessageMAU(None, self.id, self.mobile_seq, None, None,
self.local_addrs)
+ self.container.send('amqp:/_topo/0/%s/qdrouter.ma' % msg.id, smsg)
+ self.container.log_ma(LOG_TRACE, "SENT: %r" % smsg)
+
+
+ def send_mar(self, node_id, seq):
+ msg = MessageMAR(None, self.id, seq)
+ self.container.send('amqp:/_topo/0/%s/qdrouter.ma' % node_id, msg)
+ self.container.log_ma(LOG_TRACE, "SENT: %r" % msg)
+
+
Modified: qpid/dispatch/trunk/python/qpid_dispatch_internal/router/node.py
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/python/qpid_dispatch_internal/router/node.py?rev=1653556&r1=1653555&r2=1653556&view=diff
==============================================================================
--- qpid/dispatch/trunk/python/qpid_dispatch_internal/router/node.py (original)
+++ qpid/dispatch/trunk/python/qpid_dispatch_internal/router/node.py Wed Jan 21
15:22:57 2015
@@ -17,6 +17,9 @@
# under the License.
#
+from dispatch import LOG_INFO, LOG_ERROR, LOG_TRACE
+from data import LinkState
+
class NodeTracker(object):
"""
This module is responsible for tracking the set of router nodes that are
known to this
@@ -26,150 +29,270 @@ class NodeTracker(object):
The mask bit is used in the main router to represent sets of valid
destinations for addresses.
"""
def __init__(self, container, max_routers):
- self.container = container
- self.max_routers = max_routers
- self.nodes = {} # id => RemoteNode
- self.nodes_by_link_id = {} # link-id => node-id
- self.maskbits = []
- self.next_maskbit = 1 # Reserve bit '0' to represent this router
+ self.container = container
+ self.my_id = container.id
+ self.max_routers = max_routers
+ self.link_state = LinkState(None, self.my_id, 0, [])
+ self.link_state_changed = False
+ self.recompute_topology = False
+ self.nodes = {} # id => RouterNode
+ self.nodes_by_link_id = {} # link-id => node-id
+ self.maskbits = []
+ self.next_maskbit = 1 # Reserve bit '0' to represent this
router
for i in range(max_routers):
self.maskbits.append(None)
- self.maskbits[0] = True
+ self.maskbits[0] = True
+ self.neighbor_max_age = self.container.config.helloMaxAge
+ self.ls_max_age = self.container.config.remoteLsMaxAge
+
+
+ def _do_expirations(self, now):
+ """
+ Run through the list of routers and check for expired conditions
+ """
+ for node_id, node in self.nodes.items():
+ ##
+ ## If the node is a neighbor, check the neighbor refresh time to
see
+ ## if we've waited too long for a refresh. If so, disconnect the
link
+ ## and remove the node from the local link state.
+ ##
+ if node.is_neighbor():
+ if now - node.neighbor_refresh_time > self.neighbor_max_age:
+ node.remove_link()
+ if self.link_state.del_peer(node_id):
+ self.link_state_changed = True
+
+ ##
+ ## Check the age of the node's link state. If it's too old, clear
it out.
+ ##
+ if now - node.link_state.last_seen > self.ls_max_age:
+ if node.link_state.has_peers():
+ node.link_state.del_all_peers()
+ self.recompute_topology = True
+
+ ##
+ ## If the node has empty link state, check to see if it appears in
any other
+ ## node's link state. If it does not, then delete the node.
+ ##
+ if not node.link_state.has_peers() and not node.is_neighbor():
+ delete_node = True
+ for _id, _n in self.nodes.items():
+ if _id != node_id:
+ if _n.link_state.is_peer(node_id):
+ delete_node = False
+ break
+ if delete_node:
+ ##
+ ## The keep_alive_count is set to zero when a new node is
first
+ ## discovered. Since we can learn about a node before we
receive
+ ## its link state, the keep_alive_count is used to prevent
the
+ ## node from being deleted before we can learn more about
it.
+ ##
+ node.keep_alive_count += 1
+ if node.keep_alive_count > 2:
+ node.delete()
+ self.nodes.pop(node_id)
def tick(self, now):
- pass
+ send_ra = False
+ ##
+ ## Expire neighbors and link state
+ ##
+ self._do_expirations(now)
+
+ ##
+ ## Handle local link state changes
+ ##
+ if self.link_state_changed:
+ self.link_state_changed = False
+ self.link_state.bump_sequence()
+ self.recompute_topology = True
+ send_ra = True
+ self.container.log_ls(LOG_TRACE, "Local Link State: %r" %
self.link_state)
+
+ ##
+ ## Recompute the topology
+ ##
+ if self.recompute_topology:
+ self.recompute_topology = False
+ collection = {self.my_id : self.link_state}
+ for node_id, node in self.nodes.items():
+ collection[node_id] = node.link_state
+ next_hops, valid_origins =
self.container.path_engine.calculate_routes(collection)
+ self.container.log_ls(LOG_TRACE, "Computed next hops: %r" %
next_hops)
+ self.container.log_ls(LOG_TRACE, "Computed valid origins: %r" %
valid_origins)
+
+ ##
+ ## Update the next hops and valid origins for each node
+ ##
+ for node_id, next_hop_id in next_hops.items():
+ node = self.nodes[node_id]
+ next_hop = self.nodes[next_hop_id]
+ vo = valid_origins[node_id]
+ node.set_next_hop(next_hop)
+ node.set_valid_origins(vo)
+
+ ##
+ ## Send link-state requests and mobile-address requests to the nodes
+ ## that have pending requests and are reachable
+ ##
+ for node_id, node in self.nodes.items():
+ if node.link_state_requested():
+ self.container.link_state_engine.send_lsr(node_id)
+ if node.mobile_address_requested():
+ self.container.mobile_address_engine.send_mar(node_id,
node.mobile_address_sequence)
+
+ ##
+ ## If local changes have been made to the list of mobile addresses,
send
+ ## an unsolicited mobile-address-update to all routers.
+ ##
+ mobile_seq = self.container.mobile_address_engine.tick(now)
+ self.container.link_state_engine.set_mobile_seq(mobile_seq)
+
+ ##
+ ## Send an immediate RA if our link state changed
+ ##
+ if send_ra:
+ self.container.link_state_engine.send_ra(now)
+
+
+ def neighbor_refresh(self, node_id, instance, link_id, now):
+ """
+ Invoked when the hello protocol has received positive confirmation
+ of continued bi-directional connectivity with a neighbor router.
+ """
+
+ ##
+ ## If the node id is not known, create a new RouterNode to track it.
+ ##
+ if node_id not in self.nodes:
+ self.nodes[node_id] = RouterNode(self, node_id, instance)
+ node = self.nodes[node_id]
- def new_neighbor(self, node_id, link_maskbit, instance):
- """
- A node, designated by node_id, has been discovered as a neighbor over
a link with
- a maskbit of link_maskbit.
- """
- self.nodes_by_link_id[link_maskbit] = node_id
- if node_id in self.nodes:
+ ##
+ ## Set the link_id to indicate this is a neighbor router. If the
link_id
+ ## changed, update the index and add the neighbor to the local link
state.
+ ##
+ if node.set_link_id(link_id):
+ self.nodes_by_link_id[link_id] = node
+ node.request_link_state()
+ if self.link_state.add_peer(node_id):
+ self.link_state_changed = True
+
+ ##
+ ## Update the refresh time for later expiration checks
+ ##
+ node.neighbor_refresh_time = now
+
+ ##
+ ## If the instance was updated (i.e. the neighbor restarted suddenly),
+ ## schedule a topology recompute and a link-state-request to that
router.
+ ##
+ if node.update_instance(instance):
+ self.recompute_topology = True
+ node.request_link_state()
+
+
+ def link_lost(self, link_id):
+ """
+ Invoked when an inter-router link is dropped.
+ """
+ self.container.log_ls(LOG_INFO, "Router Link Lost - link_id=%d" %
link_id)
+ node_id = self.link_id_to_node_id(link_id)
+ if node_id:
+ self.nodes_by_link_id.pop(link_id)
node = self.nodes[node_id]
- if node.neighbor:
- if node.update_instance(instance):
- self.container.del_neighbor_router(node.id, node.maskbit)
- else:
- return
- else:
- self.container.del_remote_router(node.id, node.maskbit)
- node.neighbor = True
- node.link_id = link_maskbit
- else:
- node = RemoteNode(node_id, self._allocate_maskbit(), True,
link_maskbit, instance)
- self.nodes[node_id] = node
- self.container.add_neighbor_router(self._address(node_id),
node.maskbit, link_maskbit)
+ node.remove_link()
+ if self.link_state.del_peer(node_id):
+ self.link_state_changed = True
- def lost_neighbor(self, node_id):
+ def ra_received(self, node_id, ls_seq, mobile_seq, instance, now):
"""
- We have lost contact with a neighboring node.
+ Invoked when a router advertisement is received from another router.
"""
+ ##
+ ## If the node id is not known, create a new RouterNode to track it.
+ ##
+ if node_id not in self.nodes:
+ self.nodes[node_id] = RouterNode(self, node_id, instance)
node = self.nodes[node_id]
- node.neighbor = False
- self.nodes_by_link_id.pop(node.link_id)
- node.link_id = None
- self.container.del_neighbor_router(node.id, node.maskbit)
- if node.remote:
- self.container.add_remote_router(self._address(node.id),
node.maskbit)
- else:
- self._free_maskbit(node.maskbit)
- self.nodes.pop(node_id)
+ ##
+ ## If the instance was updated (i.e. the router restarted suddenly),
+ ## schedule a topology recompute and a link-state-request to that
router.
+ ##
+ if node.update_instance(instance):
+ self.recompute_topology = True
+ node.request_link_state()
- def new_node(self, node_id, instance):
- """
- A node, designated by node_id, has been discovered through the an
advertisement from a
- remote peer.
- """
- if node_id not in self.nodes:
- node = RemoteNode(node_id, self._allocate_maskbit(), False, None,
instance)
- self.nodes[node_id] = node
- self.container.add_remote_router(self._address(node.id),
node.maskbit)
- else:
- node = self.nodes[node_id]
- node.remote = True
- if node.update_instance(instance):
- self.container.del_remote_router(node.id, node.maskbit)
- self.container.add_remote_router(self._address(node.id),
node.maskbit)
+ ##
+ ## Update the last seen time to now to control expiration of the link
state.
+ ##
+ node.link_state.last_seen = now
+ ##
+ ## Check the link state sequence. Send a link state request if our
records are
+ ## not up to date.
+ ##
+ if node.link_state.ls_seq < ls_seq:
+ self.container.link_state_engine.send_lsr(node_id)
- def lost_node(self, node_id):
- """
- A remote node, node_id, has not been heard from for too long and is
being deemed lost.
- """
- node = self.nodes[node_id]
- if node.remote:
- node.remote = False
- if not node.neighbor:
- self.container.del_remote_router(node.id, node.maskbit)
- self._free_maskbit(node.maskbit)
- self.nodes.pop(node_id)
+ ##
+ ## Check the mobile sequence. Send a mobile-address-request if we are
+ ## behind the advertized sequence.
+ ##
+ if node.mobile_address_sequence < mobile_seq:
+ node.mobile_address_request()
- def touch_node(self, node_id, instance):
+ def router_learned(self, node_id):
"""
- We've received an advertisement or hello from a node. If the instance
has changed,
- we need to treat the node as though it was lost and regained.
+ Invoked when we learn about another router by any means
"""
- try:
- node = self.nodes[node_id]
- if node.update_instance(instance):
- if node.neighbor:
- self.container.del_neighbor_router(node.id, node.maskbit)
- self.container.add_neighbor_router(self._address(node_id),
node.maskbit, node.link_id)
- elif node.remote:
- self.container.del_remote_router(node.id, node.maskbit)
- self.container.add_remote_router(self._address(node.id),
node.maskbit)
- return True
- except:
- pass
- return False
+ if node_id not in self.nodes and node_id != self.my_id:
+ self.nodes[node_id] = RouterNode(self, node_id, None)
- def maskbit_for_node(self, node_id):
+ def link_state_received(self, node_id, link_state, instance, now):
"""
+ Invoked when a link state update is received from another router.
"""
- if node_id in self.nodes:
- node = self.nodes[node_id]
- return node.maskbit
- return None
-
-
- def add_addresses(self, node_id, addrs):
+ ##
+ ## If the node id is not known, create a new RouterNode to track it.
+ ##
+ if node_id not in self.nodes:
+ self.nodes[node_id] = RouterNode(self, node_id, instance)
node = self.nodes[node_id]
- for a in addrs:
- node.addrs[a] = 1
+ ##
+ ## If the new link state is more up-to-date than the stored link state,
+ ## update it and schedule a topology recompute.
+ ##
+ if link_state.ls_seq > node.link_state.ls_seq:
+ node.link_state = link_state
+ node.link_state.last_seen = now
+ self.recompute_topology = True
+
+ ##
+ ## Look through the new link state for references to nodes that we
don't
+ ## know about. Schedule link state requests for those nodes to be
sent
+ ## after we next recompute the topology.
+ ##
+ for peer in node.link_state.peers:
+ if peer not in self.nodes:
+ self.router_learned(peer)
- def del_addresses(self, node_id, addrs):
- node = self.nodes[node_id]
- for a in addrs:
- node.addrs.pop(a)
-
- def overwrite_addresses(self, node_id, addrs):
- node = self.nodes[node_id]
- added = []
- deleted = []
- for a in addrs:
- if a not in node.addrs.keys():
- added.append(a)
- for a in node.addrs.keys():
- if a not in addrs:
- deleted.append(a)
- node.addrs = {}
- for a in addrs:
- node.addrs[a] = 1
- return (added, deleted)
+ def router_node(self, node_id):
+ return self.nodes[node_id]
def link_id_to_node_id(self, link_id):
if link_id in self.nodes_by_link_id:
- return self.nodes_by_link_id[link_id]
+ return self.nodes_by_link_id[link_id].id
return None
@@ -192,20 +315,148 @@ class NodeTracker(object):
self.next_maskbit = i
- def _address(self, node_id):
- return "amqp:/_topo/%s/%s" % (self.container.area, node_id)
+class RouterNode(object):
+ """
+ RouterNode is used to track remote routers in the router network.
+ """
-class RemoteNode(object):
+ def __init__(self, parent, node_id, instance):
+ self.parent = parent
+ self.adapter = parent.container.router_adapter
+ self.log = parent.container.log
+ self.id = node_id
+ self.instance = instance
+ self.maskbit = self.parent._allocate_maskbit()
+ self.neighbor_refresh_time = 0.0
+ self.peer_link_id = None
+ self.link_state = LinkState(None, self.id, 0, [])
+ self.next_hop_router = None
+ self.valid_origins = None
+ self.mobile_addresses = []
+ self.mobile_address_sequence = 0
+ self.need_ls_request = True
+ self.need_mobile_request = False
+ self.keep_alive_count = 0
+ self.adapter.add_router("amqp:/_topo/0/%s/qdrouter" % self.id,
self.maskbit)
+ self.log(LOG_TRACE, "Node %s created: maskbit=%d" % (self.id,
self.maskbit))
+
+
+ def set_link_id(self, link_id):
+ if self.peer_link_id == link_id:
+ return False
+ self.peer_link_id = link_id
+ self.next_hop_router = None
+ self.adapter.set_link(self.maskbit, link_id)
+ self.adapter.remove_next_hop(self.maskbit)
+ self.log(LOG_TRACE, "Node %s link set: link_id=%r" % (self.id,
link_id))
+ return True
+
+
+ def remove_link(self):
+ if self.peer_link_id != None:
+ self.peer_link_id = None
+ self.adapter.remove_link(self.maskbit)
+ self.log(LOG_TRACE, "Node %s link removed" % self.id)
- def __init__(self, node_id, maskbit, neighbor, link_id, instance):
- self.id = node_id
- self.maskbit = maskbit
- self.neighbor = neighbor
- self.remote = not neighbor
- self.link_id = link_id
- self.instance = instance
- self.addrs = {} # Address => Count at Node (1 only for the present)
+
+ def delete(self):
+ self.unmap_all_addresses()
+ self.adapter.del_router(self.maskbit)
+ self.parent._free_maskbit(self.maskbit)
+ self.log(LOG_TRACE, "Node %s deleted" % self.id)
+
+
+ def set_next_hop(self, next_hop):
+ if self.id == next_hop.id:
+ return
+ if self.next_hop_router and self.next_hop_router.id == next_hop.id:
+ return
+ self.next_hop_router = next_hop
+ self.adapter.set_next_hop(self.maskbit, next_hop.maskbit)
+ self.log(LOG_TRACE, "Node %s next hop set: %s" % (self.id,
next_hop.id))
+
+
+ def set_valid_origins(self, valid_origins):
+ if self.valid_origins == valid_origins:
+ return
+ self.valid_origins = valid_origins
+ vo_mb = [self.parent.nodes[N].maskbit for N in valid_origins]
+ self.adapter.set_valid_origins(self.maskbit, vo_mb)
+ self.log(LOG_TRACE, "Node %s valid origins: %r" % (self.id,
valid_origins))
+
+
+ def remove_next_hop(self):
+ if self.next_hop_router:
+ self.next_hop_router = None
+ self.adapter.remove_next_hop(self.maskbit)
+ self.log(LOG_TRACE, "Node %s next hop removed" % self.id)
+
+
+ def is_neighbor(self):
+ return self.peer_link_id != None
+
+
+ def request_link_state(self):
+ """
+ Set the link-state-requested flag so we can send this node a link-state
+ request at the most opportune time.
+ """
+ self.need_ls_request = True
+
+
+ def link_state_requested(self):
+ """
+ Return True iff we need to request this node's link state AND the node
is
+ reachable. There's no point in sending it a request if we don't know
how to
+ reach it.
+ """
+ if self.need_ls_request and (self.peer_link_id != None or
self.next_hop_router != None):
+ self.need_ls_request = False
+ return True
+ return False
+
+
+ def mobile_address_request(self):
+ self.need_mobile_request = True
+
+
+ def mobile_address_requested(self):
+ if self.need_mobile_request and (self.peer_link_id != None or
self.next_hop_router != None):
+ self.need_mobile_request = False
+ return True
+ return False
+
+
+ def map_address(self, addr):
+ self.mobile_addresses.append(addr)
+ self.adapter.map_destination(addr[0], addr[1:], self.maskbit)
+
+
+ def unmap_address(self, addr):
+ self.mobile_addresses.remove(addr)
+ self.adapter.unmap_destination(addr[0], addr[1:], self.maskbit)
+
+
+ def unmap_all_addresses(self):
+ self.mobile_address_sequence = 0
+ for addr in self.mobile_addresses:
+ self.unmap_address(addr)
+
+
+ def overwrite_addresses(self, addrs):
+ added = []
+ deleted = []
+ for a in addrs:
+ if a not in self.mobile_addresses:
+ added.append(a)
+ for a in self.mobile_addresses:
+ if a not in addrs:
+ deleted.append(a)
+ for a in added:
+ self.map_address(a)
+ for a in deleted:
+ self.unmap_address(a)
def update_instance(self, instance):
@@ -216,5 +467,10 @@ class RemoteNode(object):
return False
if self.instance == instance:
return False
+
self.instance = instance
+ self.link_state.del_all_peers()
+ self.unmap_all_addresses()
+ self.log(LOG_TRACE, "Node %s detected restart" % self.id)
return True
+
Modified: qpid/dispatch/trunk/python/qpid_dispatch_internal/router/path.py
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/python/qpid_dispatch_internal/router/path.py?rev=1653556&r1=1653555&r2=1653556&view=diff
==============================================================================
--- qpid/dispatch/trunk/python/qpid_dispatch_internal/router/path.py (original)
+++ qpid/dispatch/trunk/python/qpid_dispatch_internal/router/path.py Wed Jan 21
15:22:57 2015
@@ -20,29 +20,15 @@
class PathEngine(object):
"""
- This module is responsible for computing the next-hop for every
router/area in the domain
+ This module is responsible for computing the next-hop for every router in
the domain
based on the collection of link states that have been gathered.
"""
def __init__(self, container):
self.container = container
self.id = self.container.id
- self.area = self.container.area
- self.recalculate = False
- self.collection = None
- def tick(self, now_unused):
- if self.recalculate:
- self.recalculate = False
- self._calculate_routes()
-
-
- def ls_collection_changed(self, collection):
- self.recalculate = True
- self.collection = collection
-
-
- def _calculate_tree_from_root(self, root):
+ def _calculate_tree_from_root(self, root, collection):
##
## Make a copy of the current collection of link-states that contains
## a fake link-state for nodes that are known-peers but are not in the
@@ -50,7 +36,7 @@ class PathEngine(object):
## so we can trade link-state information with them.
##
link_states = {}
- for _id, ls in self.collection.items():
+ for _id, ls in collection.items():
link_states[_id] = ls.peers
for p in ls.peers:
if p not in link_states:
@@ -98,7 +84,7 @@ class PathEngine(object):
return prev
- def _calculate_valid_origins(self, nodeset):
+ def _calculate_valid_origins(self, nodeset, collection):
##
## Calculate the tree from each origin, determine the set of
origins-per-dest
## for which the path from origin to dest passes through us. This is
the set
@@ -110,7 +96,7 @@ class PathEngine(object):
valid_origin[node] = []
for root in valid_origin.keys():
- prev = self._calculate_tree_from_root(root)
+ prev = self._calculate_tree_from_root(root, collection)
nodes = prev.keys()
while len(nodes) > 0:
u = nodes[0]
@@ -129,11 +115,11 @@ class PathEngine(object):
return valid_origin
- def _calculate_routes(self):
+ def calculate_routes(self, collection):
##
## Generate the shortest-path tree with the local node as root
##
- prev = self._calculate_tree_from_root(self.id)
+ prev = self._calculate_tree_from_root(self.id, collection)
nodes = prev.keys()
##
@@ -154,13 +140,12 @@ class PathEngine(object):
for w in path: # mark each node in the path as reachable
via the next hop
next_hops[w] = u
- self.container.next_hops_changed(next_hops)
-
##
## Calculate the valid origins for remote routers
##
- valid_origin = self._calculate_valid_origins(prev.keys())
- self.container.valid_origins_changed(valid_origin)
+ valid_origins = self._calculate_valid_origins(prev.keys(), collection)
+
+ return (next_hops, valid_origins)
Modified: qpid/dispatch/trunk/src/python_embedded.c
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/python_embedded.c?rev=1653556&r1=1653555&r2=1653556&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/python_embedded.c (original)
+++ qpid/dispatch/trunk/src/python_embedded.c Wed Jan 21 15:22:57 2015
@@ -52,9 +52,11 @@ void qd_python_initialize(qd_dispatch_t
ilock = sys_mutex();
if (python_pkgdir)
dispatch_python_pkgdir = PyString_FromString(python_pkgdir);
- Py_Initialize();
+ qd_python_lock_state_t ls = qd_python_lock();
+ Py_Initialize();
qd_python_setup();
+ qd_python_unlock(ls);
}
Modified: qpid/dispatch/trunk/src/router_node.c
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/router_node.c?rev=1653556&r1=1653555&r2=1653556&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/router_node.c (original)
+++ qpid/dispatch/trunk/src/router_node.c Wed Jan 21 15:22:57 2015
@@ -1449,8 +1449,9 @@ qd_router_t *qd_router(qd_dispatch_t *qd
// locally later in the initialization sequence.
//
if (router->router_mode == QD_ROUTER_MODE_INTERIOR) {
- router->router_addr = qd_router_register_address(qd, "qdrouter", 0,
QD_SEMANTICS_ROUTER_CONTROL, false, 0);
- router->hello_addr = qd_router_register_address(qd, "qdhello", 0,
QD_SEMANTICS_ROUTER_CONTROL, false, 0);
+ router->router_addr = qd_router_register_address(qd, "qdrouter", 0,
QD_SEMANTICS_ROUTER_CONTROL, false, 0);
+ router->routerma_addr = qd_router_register_address(qd, "qdrouter.ma",
0, QD_SEMANTICS_DEFAULT, false, 0);
+ router->hello_addr = qd_router_register_address(qd, "qdhello", 0,
QD_SEMANTICS_ROUTER_CONTROL, false, 0);
}
//
Modified: qpid/dispatch/trunk/src/router_private.h
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/router_private.h?rev=1653556&r1=1653555&r2=1653556&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/router_private.h (original)
+++ qpid/dispatch/trunk/src/router_private.h Wed Jan 21 15:22:57 2015
@@ -219,6 +219,7 @@ struct qd_router_t {
qd_address_list_t addrs;
qd_hash_t *addr_hash;
qd_address_t *router_addr;
+ qd_address_t *routerma_addr;
qd_address_t *hello_addr;
qd_router_link_list_t links;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]