Repository: qpid-dispatch Updated Branches: refs/heads/master 7d6c05376 -> 3bb2c53d8
DISPATCH-629 - Added protocol-version to inter-router messages for future backward compatibility. Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/3bb2c53d Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/3bb2c53d Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/3bb2c53d Branch: refs/heads/master Commit: 3bb2c53d8dcd708e50340e458b84a6507dbb8b13 Parents: 7d6c053 Author: Ted Ross <[email protected]> Authored: Fri Feb 3 15:17:09 2017 -0500 Committer: Ted Ross <[email protected]> Committed: Fri Feb 3 15:17:09 2017 -0500 ---------------------------------------------------------------------- python/qpid_dispatch/management/qdrouter.json | 4 ++ python/qpid_dispatch_internal/router/data.py | 52 ++++++++++++++++------ python/qpid_dispatch_internal/router/hello.py | 2 +- python/qpid_dispatch_internal/router/link.py | 6 +-- python/qpid_dispatch_internal/router/node.py | 50 +++++++++++++++------ tests/router_engine_test.py | 4 +- tools/qdstat | 22 ++++----- 7 files changed, 96 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/3bb2c53d/python/qpid_dispatch/management/qdrouter.json ---------------------------------------------------------------------- diff --git a/python/qpid_dispatch/management/qdrouter.json b/python/qpid_dispatch/management/qdrouter.json index f4ab551..3fcadb4 100644 --- a/python/qpid_dispatch/management/qdrouter.json +++ b/python/qpid_dispatch/management/qdrouter.json @@ -1219,6 +1219,10 @@ "description": "Remote node identifier.", "type": "string" }, + "protocolVersion": { + "description": "Router-protocol version supported by the node.", + "type": "integer" + }, "instance": { "description": "Remote node boot number.", "type": "integer" http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/3bb2c53d/python/qpid_dispatch_internal/router/data.py ---------------------------------------------------------------------- diff --git a/python/qpid_dispatch_internal/router/data.py b/python/qpid_dispatch_internal/router/data.py index 5b739b8..a2b669b 100644 --- a/python/qpid_dispatch_internal/router/data.py +++ b/python/qpid_dispatch_internal/router/data.py @@ -17,6 +17,12 @@ # under the License. # +## +## Define the current protocol version. Any messages that do not contain version +## information shall be considered to be coming from routers using version 0. +## +ProtocolVersion = 1L + def getMandatory(data, key, cls=None): """ Get the value mapped to the requested key. If it's not present, raise an exception. @@ -107,20 +113,23 @@ class MessageHELLO(object): self.area = '0' self.seen_peers = getMandatory(body, 'seen', list) self.instance = getOptional(body, 'instance', 0, long) + self.version = getOptional(body, 'pv', 0, long) else: self.id = _id self.area = '0' self.seen_peers = _seen_peers self.instance = _instance + self.version = ProtocolVersion def __repr__(self): - return "HELLO(id=%s area=%s inst=%d seen=%r)" % (self.id, self.area, self.instance, self.seen_peers) + return "HELLO(id=%s pv=%d area=%s inst=%d seen=%r)" % (self.id, self.version, self.area, self.instance, self.seen_peers) def get_opcode(self): return 'HELLO' def to_dict(self): return {'id' : self.id, + 'pv' : self.version, 'area' : self.area, 'instance' : self.instance, 'seen' : self.seen_peers} @@ -143,22 +152,25 @@ class MessageRA(object): self.ls_seq = getMandatory(body, 'ls_seq', long) self.mobile_seq = getMandatory(body, 'mobile_seq', long) self.instance = getOptional(body, 'instance', 0, long) + self.version = getOptional(body, 'pv', 0, long) else: self.id = _id self.area = '0' self.ls_seq = long(_ls_seq) self.mobile_seq = long(_mobile_seq) self.instance = _instance + self.version = ProtocolVersion def get_opcode(self): return 'RA' def __repr__(self): - return "RA(id=%s area=%s inst=%d ls_seq=%d mobile_seq=%d)" % \ - (self.id, self.area, self.instance, self.ls_seq, self.mobile_seq) + return "RA(id=%s pv=%d area=%s inst=%d ls_seq=%d mobile_seq=%d)" % \ + (self.id, self.version, self.area, self.instance, self.ls_seq, self.mobile_seq) def to_dict(self): return {'id' : self.id, + 'pv' : self.version, 'area' : self.area, 'instance' : self.instance, 'ls_seq' : self.ls_seq, @@ -175,22 +187,25 @@ class MessageLSU(object): self.ls_seq = getMandatory(body, 'ls_seq', long) self.ls = LinkState(getMandatory(body, 'ls', dict)) self.instance = getOptional(body, 'instance', 0, long) + self.version = getOptional(body, 'pv', 0, long) else: self.id = _id self.area = '0' self.ls_seq = long(_ls_seq) self.ls = _ls self.instance = _instance + self.version = ProtocolVersion def get_opcode(self): return 'LSU' def __repr__(self): - return "LSU(id=%s area=%s inst=%d ls_seq=%d ls=%r)" % \ - (self.id, self.area, self.instance, self.ls_seq, self.ls) + return "LSU(id=%s pv=%d area=%s inst=%d ls_seq=%d ls=%r)" % \ + (self.id, self.version, self.area, self.instance, self.ls_seq, self.ls) def to_dict(self): return {'id' : self.id, + 'pv' : self.version, 'area' : self.area, 'instance' : self.instance, 'ls_seq' : self.ls_seq, @@ -203,20 +218,23 @@ class MessageLSR(object): def __init__(self, body, _id=None): if body: self.id = getMandatory(body, 'id', str) + self.version = getOptional(body, 'pv', 0, long) self.area = '0' else: self.id = _id + self.version = ProtocolVersion self.area = '0' def get_opcode(self): return 'LSR' def __repr__(self): - return "LSR(id=%s area=%s)" % (self.id, self.area) + return "LSR(id=%s pv=%d area=%s)" % (self.id, self.version, self.area) def to_dict(self): - return {'id' : self.id, - 'area' : self.area} + return {'id' : self.id, + 'pv' : self.version, + 'area' : self.area} class MessageMAU(object): @@ -225,6 +243,7 @@ class MessageMAU(object): def __init__(self, body, _id=None, _seq=None, _add_list=None, _del_list=None, _exist_list=None): if body: self.id = getMandatory(body, 'id', str) + self.version = getOptional(body, 'pv', 0, long) self.area = '0' self.mobile_seq = getMandatory(body, 'mobile_seq', long) self.add_list = getOptional(body, 'add', None, list) @@ -232,6 +251,7 @@ class MessageMAU(object): self.exist_list = getOptional(body, 'exist', None, list) else: self.id = _id + self.version = ProtocolVersion self.area = '0' self.mobile_seq = long(_seq) self.add_list = _add_list @@ -248,13 +268,14 @@ class MessageMAU(object): if self.add_list != None: _add = ' add=%r' % self.add_list if self.del_list != None: _del = ' del=%r' % self.del_list if self.exist_list != None: _exist = ' exist=%r' % self.exist_list - return "MAU(id=%s area=%s mobile_seq=%d%s%s%s)" % \ - (self.id, self.area, self.mobile_seq, _add, _del, _exist) + return "MAU(id=%s pv=%d area=%s mobile_seq=%d%s%s%s)" % \ + (self.id, self.version, self.area, self.mobile_seq, _add, _del, _exist) def to_dict(self): - body = { 'id' : self.id, - 'area' : self.area, - 'mobile_seq' : self.mobile_seq } + body = {'id' : self.id, + 'pv' : self.version, + 'area' : self.area, + 'mobile_seq' : self.mobile_seq } if self.add_list != None: body['add'] = self.add_list if self.del_list != None: body['del'] = self.del_list if self.exist_list != None: body['exist'] = self.exist_list @@ -267,10 +288,12 @@ class MessageMAR(object): def __init__(self, body, _id=None, _have_seq=None): if body: self.id = getMandatory(body, 'id', str) + self.version = getOptional(body, 'pv', 0, long) self.area = '0' self.have_seq = getMandatory(body, 'have_seq', long) else: self.id = _id + self.version = ProtocolVersion self.area = '0' self.have_seq = long(_have_seq) @@ -278,9 +301,10 @@ class MessageMAR(object): return 'MAR' def __repr__(self): - return "MAR(id=%s area=%s have_seq=%d)" % (self.id, self.area, self.have_seq) + return "MAR(id=%s pv=%d area=%s have_seq=%d)" % (self.id, self.version, self.area, self.have_seq) def to_dict(self): return {'id' : self.id, + 'pv' : self.version, 'area' : self.area, 'have_seq' : self.have_seq} http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/3bb2c53d/python/qpid_dispatch_internal/router/hello.py ---------------------------------------------------------------------- diff --git a/python/qpid_dispatch_internal/router/hello.py b/python/qpid_dispatch_internal/router/hello.py index 8c3224a..cbd55ed 100644 --- a/python/qpid_dispatch_internal/router/hello.py +++ b/python/qpid_dispatch_internal/router/hello.py @@ -54,7 +54,7 @@ class HelloProtocol(object): return self.hellos[msg.id] = now if msg.is_seen(self.id): - self.node_tracker.neighbor_refresh(msg.id, msg.instance, link_id, cost, now) + self.node_tracker.neighbor_refresh(msg.id, msg.version, msg.instance, link_id, cost, now) def _expire_hellos(self, now): http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/3bb2c53d/python/qpid_dispatch_internal/router/link.py ---------------------------------------------------------------------- diff --git a/python/qpid_dispatch_internal/router/link.py b/python/qpid_dispatch_internal/router/link.py index 37671e4..13bd1b5 100644 --- a/python/qpid_dispatch_internal/router/link.py +++ b/python/qpid_dispatch_internal/router/link.py @@ -49,19 +49,19 @@ class LinkStateEngine(object): def handle_ra(self, msg, now): if msg.id == self.id: return - self.node_tracker.ra_received(msg.id, msg.ls_seq, msg.mobile_seq, msg.instance, now) + self.node_tracker.ra_received(msg.id, msg.version, msg.ls_seq, msg.mobile_seq, msg.instance, now) def handle_lsu(self, msg, now): if msg.id == self.id: return - self.node_tracker.link_state_received(msg.id, msg.ls, msg.instance, now) + self.node_tracker.link_state_received(msg.id, msg.version, msg.ls, msg.instance, now) def handle_lsr(self, msg, now): if msg.id == self.id: return - self.node_tracker.router_learned(msg.id) + self.node_tracker.router_learned(msg.id, msg.version) my_ls = self.node_tracker.link_state smsg = MessageLSU(None, self.id, my_ls.ls_seq, my_ls, self.container.instance) self.container.send('amqp:/_topo/%s/%s/qdrouter' % (msg.area, msg.id), smsg) http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/3bb2c53d/python/qpid_dispatch_internal/router/node.py ---------------------------------------------------------------------- diff --git a/python/qpid_dispatch_internal/router/node.py b/python/qpid_dispatch_internal/router/node.py index 9822169..a9af3c6 100644 --- a/python/qpid_dispatch_internal/router/node.py +++ b/python/qpid_dispatch_internal/router/node.py @@ -18,7 +18,7 @@ # from ..dispatch import LOG_INFO, LOG_TRACE, LOG_DEBUG -from data import LinkState +from data import LinkState, ProtocolVersion from .address import Address class NodeTracker(object): @@ -55,6 +55,7 @@ class NodeTracker(object): """Refresh management attributes""" attributes.update({ "id": self.my_id, + "protocolVersion": ProtocolVersion, "instance": self.container.instance, # Boot number, integer "linkState": [ls for ls in self.link_state.peers], # List of neighbour nodes "nextHop": "(self)", @@ -188,7 +189,7 @@ class NodeTracker(object): self.container.link_state_engine.send_ra(now) - def neighbor_refresh(self, node_id, instance, link_id, cost, now): + def neighbor_refresh(self, node_id, version, instance, link_id, cost, now): """ Invoked when the hello protocol has received positive confirmation of continued bi-directional connectivity with a neighbor router. @@ -198,10 +199,16 @@ class NodeTracker(object): ## If the node id is not known, create a new RouterNode to track it. ## if node_id not in self.nodes: - self.nodes[node_id] = RouterNode(self, node_id, instance) + self.nodes[node_id] = RouterNode(self, node_id, version, instance) node = self.nodes[node_id] ## + ## Add the version if we haven't already done so. + ## + if node.version == None: + node.version = version + + ## ## Set the link_id to indicate this is a neighbor router. If the link_id ## changed, update the index and add the neighbor to the local link state. ## @@ -220,7 +227,7 @@ class NodeTracker(object): ## If the instance was updated (i.e. the neighbor restarted suddenly), ## schedule a topology recompute and a link-state-request to that router. ## - if node.update_instance(instance): + if node.update_instance(instance, version): self.recompute_topology = True node.request_link_state() @@ -247,7 +254,7 @@ class NodeTracker(object): return result - def ra_received(self, node_id, ls_seq, mobile_seq, instance, now): + def ra_received(self, node_id, version, ls_seq, mobile_seq, instance, now): """ Invoked when a router advertisement is received from another router. """ @@ -255,14 +262,20 @@ class NodeTracker(object): ## If the node id is not known, create a new RouterNode to track it. ## if node_id not in self.nodes: - self.nodes[node_id] = RouterNode(self, node_id, instance) + self.nodes[node_id] = RouterNode(self, node_id, version, instance) node = self.nodes[node_id] ## + ## Add the version if we haven't already done so. + ## + if node.version == None: + node.version = version + + ## ## If the instance was updated (i.e. the router restarted suddenly), ## schedule a topology recompute and a link-state-request to that router. ## - if node.update_instance(instance): + if node.update_instance(instance, version): self.recompute_topology = True node.request_link_state() @@ -286,15 +299,15 @@ class NodeTracker(object): node.mobile_address_request() - def router_learned(self, node_id): + def router_learned(self, node_id, version): """ Invoked when we learn about another router by any means """ if node_id not in self.nodes and node_id != self.my_id: - self.nodes[node_id] = RouterNode(self, node_id, None) + self.nodes[node_id] = RouterNode(self, node_id, version, None) - def link_state_received(self, node_id, link_state, instance, now): + def link_state_received(self, node_id, version, link_state, instance, now): """ Invoked when a link state update is received from another router. """ @@ -302,10 +315,16 @@ class NodeTracker(object): ## If the node id is not known, create a new RouterNode to track it. ## if node_id not in self.nodes: - self.nodes[node_id] = RouterNode(self, node_id, instance) + self.nodes[node_id] = RouterNode(self, node_id, version, instance) node = self.nodes[node_id] ## + ## Add the version if we haven't already done so. + ## + if node.version == None: + node.version = version + + ## ## If the new link state is more up-to-date than the stored link state, ## update it and schedule a topology recompute. ## @@ -321,7 +340,7 @@ class NodeTracker(object): ## for peer in node.link_state.peers: if peer not in self.nodes: - self.router_learned(peer) + self.router_learned(peer, None) def router_node(self, node_id): @@ -359,11 +378,12 @@ class RouterNode(object): RouterNode is used to track remote routers in the router network. """ - def __init__(self, parent, node_id, instance): + def __init__(self, parent, node_id, version, instance): self.parent = parent self.adapter = parent.container.router_adapter self.log = parent.container.log self.id = node_id + self.version = version self.instance = instance self.maskbit = self.parent._allocate_maskbit() self.neighbor_refresh_time = 0.0 @@ -385,6 +405,7 @@ class RouterNode(object): """Refresh management attributes""" attributes.update({ "id": self.id, + "protocolVersion": self.version, "instance": self.instance, # Boot number, integer "linkState": [ls for ls in self.link_state.peers], # List of neighbour nodes "nextHop": self.next_hop_router and self.next_hop_router.id, @@ -531,7 +552,7 @@ class RouterNode(object): self.unmap_address(a) - def update_instance(self, instance): + def update_instance(self, instance, version): if instance == None: return False if self.instance == None: @@ -541,6 +562,7 @@ class RouterNode(object): return False self.instance = instance + self.version = version self.link_state.del_all_peers() self.unmap_all_addresses() self.log(LOG_INFO, "Detected Restart of Router Node %s" % self.id) http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/3bb2c53d/tests/router_engine_test.py ---------------------------------------------------------------------- diff --git a/tests/router_engine_test.py b/tests/router_engine_test.py index 8a18baa..8132270 100644 --- a/tests/router_engine_test.py +++ b/tests/router_engine_test.py @@ -25,7 +25,7 @@ import mock # Mock definitions for tests. sys.path.append(os.path.join(os.environ["SOURCE_DIR"], "python")) from qpid_dispatch_internal.router.engine import HelloProtocol, PathEngine, NodeTracker -from qpid_dispatch_internal.router.data import LinkState, MessageHELLO +from qpid_dispatch_internal.router.data import LinkState, MessageHELLO, ProtocolVersion from qpid_dispatch.management.entity import EntityBase from system_test import main_module @@ -141,7 +141,7 @@ class NeighborTest(unittest.TestCase): def send(self, dest, msg): self.sent.append((dest, msg)) - def neighbor_refresh(self, node_id, instance, link_id, cost, now): + def neighbor_refresh(self, node_id, ProtocolVersion, instance, link_id, cost, now): self.neighbors[node_id] = (instance, link_id, cost, now) def setUp(self): http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/3bb2c53d/tools/qdstat ---------------------------------------------------------------------- diff --git a/tools/qdstat b/tools/qdstat index b85e204..6794495 100755 --- a/tools/qdstat +++ b/tools/qdstat @@ -66,6 +66,12 @@ def parse_args(argv): return opts, args +def get(obj, attr): + if attr in obj.__dict__: + return obj.__dict__[attr] + return None + + class BusManager(Node): schema = QdSchema() @@ -138,10 +144,7 @@ class BusManager(Node): heads.append(Header("tenant")) rows = [] - cols = ('identity', 'host', 'container', 'role', 'dir', 'isAuthenticated', 'sasl', - 'user', 'isEncrypted', 'sslProto', 'sslCipher', 'tenant') - - objects = self.query('org.apache.qpid.dispatch.connection', cols, limit=self.opts.limit) + objects = self.query('org.apache.qpid.dispatch.connection', limit=self.opts.limit) for conn in objects: row = [] @@ -152,7 +155,7 @@ class BusManager(Node): row.append(conn.dir) row.append(self.connSecurity(conn)) row.append(self.connAuth(conn)) - row.append(self.noTrailingSlash(conn.tenant)) + row.append(self.noTrailingSlash(get(conn, 'tenant'))) rows.append(row) title = "Connections" dispRows = rows @@ -307,14 +310,12 @@ class BusManager(Node): heads.append(Header("next-hop")) heads.append(Header("link")) if self.opts.verbose: + heads.append(Header("ver")) heads.append(Header("cost")) heads.append(Header("neighbors")) heads.append(Header("valid-origins")) rows = [] - cols = ('id', 'nextHop', 'routerLink', 'lastTopoChange') - if self.opts.verbose: - cols += ('cost', 'linkState', 'validOrigins') - objects = self.query('org.apache.qpid.dispatch.router.node', cols, limit=self.opts.limit) + objects = self.query('org.apache.qpid.dispatch.router.node', limit=self.opts.limit) # Find the most recent topo change in this neighborhood. lastTopoChange = 0.0 @@ -333,7 +334,8 @@ class BusManager(Node): row.append(node.routerLink) if self.opts.verbose: - row.append(node.cost) + row.append(get(node, 'protocolVersion')) + row.append(get(node, 'cost')) row.append('%r' % self._list_clean(node.linkState)) row.append('%r' % self._list_clean(node.validOrigins)) rows.append(row) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
