This is an automated email from the ASF dual-hosted git repository. kgiusti pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
commit 08454ae9a4ca2a5f707fec6bc41df1da7a33b401 Author: Kenneth Giusti <[email protected]> AuthorDate: Mon Jan 24 17:56:37 2022 -0500 DISPATCH-2317: advertise the version of MA supported by the router --- include/qpid/dispatch/amqp.h | 2 + src/amqp.c | 1 + src/router_node.c | 12 +++- src/server.c | 8 +++ tests/system_tests_open_properties.py | 117 ++++++++++++++++++++++++++++++++++ 5 files changed, 139 insertions(+), 1 deletion(-) diff --git a/include/qpid/dispatch/amqp.h b/include/qpid/dispatch/amqp.h index 3d7b1d7..4f307ba 100644 --- a/include/qpid/dispatch/amqp.h +++ b/include/qpid/dispatch/amqp.h @@ -106,6 +106,7 @@ typedef enum { /** @name Message Annotation Headers */ /// @{ +#define QD_ROUTER_ANNOTATIONS_VERSION 1 extern const char * const QD_MA_PREFIX; extern const char * const QD_MA_INGRESS; ///< Ingress Router extern const char * const QD_MA_TRACE; ///< Trace @@ -170,6 +171,7 @@ extern const char * const QD_CONNECTION_PROPERTY_FAILOVER_SCHEME_KEY; extern const char * const QD_CONNECTION_PROPERTY_FAILOVER_HOSTNAME_KEY; extern const char * const QD_CONNECTION_PROPERTY_ADAPTOR_KEY; extern const char * const QD_CONNECTION_PROPERTY_TCP_ADAPTOR_VALUE; +extern const char * const QD_CONNECTION_PROPERTY_ANNOTATIONS_VERSION_KEY; /// @} /** @name Terminus Addresses */ diff --git a/src/amqp.c b/src/amqp.c index cd69dfb..ec8a801 100644 --- a/src/amqp.c +++ b/src/amqp.c @@ -66,6 +66,7 @@ const char * const QD_CONNECTION_PROPERTY_FAILOVER_SCHEME_KEY = "scheme"; const char * const QD_CONNECTION_PROPERTY_FAILOVER_HOSTNAME_KEY = "hostname"; const char * const QD_CONNECTION_PROPERTY_ADAPTOR_KEY = "qd.adaptor"; const char * const QD_CONNECTION_PROPERTY_TCP_ADAPTOR_VALUE = "tcp"; +const char * const QD_CONNECTION_PROPERTY_ANNOTATIONS_VERSION_KEY = "qd.annotations-version"; const char * const QD_TERMINUS_EDGE_ADDRESS_TRACKING = "_$qd.edge_addr_tracking"; const char * const QD_TERMINUS_ADDRESS_LOOKUP = "_$qd.addr_lookup"; diff --git a/src/router_node.c b/src/router_node.c index 83e74fb..ba01522 100644 --- a/src/router_node.c +++ b/src/router_node.c @@ -1310,7 +1310,7 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool const size_t num_items = pn_data_get_map(props); int props_found = 0; // once all props found exit loop pn_data_enter(props); - for (int i = 0; i < num_items / 2 && props_found < 3; ++i) { + for (int i = 0; i < num_items / 2 && props_found < 4; ++i) { if (!pn_data_next(props)) break; if (pn_data_type(props) != PN_SYMBOL) break; // invalid properties map pn_bytes_t key = pn_data_get_symbol(props); @@ -1344,6 +1344,16 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool rversion[vlen] = 0; } + } else if ((key.size == strlen(QD_CONNECTION_PROPERTY_ANNOTATIONS_VERSION_KEY) + && strncmp(key.start, QD_CONNECTION_PROPERTY_ANNOTATIONS_VERSION_KEY, key.size) == 0)) { + props_found += 1; + if (!pn_data_next(props)) break; + if (is_router && pn_data_type(props) == PN_INT) { + const int annos_version = (int) pn_data_get_int(props); + qd_log(router->log_source, QD_LOG_DEBUG, + "Remote router annotations version: %d", annos_version); + } + } else { // skip this key if (!pn_data_next(props)) break; diff --git a/src/server.c b/src/server.c index 26e1342..51755e1 100644 --- a/src/server.c +++ b/src/server.c @@ -489,6 +489,14 @@ static void decorate_connection(qd_server_t *qd_server, pn_connection_t *conn, c } if (config) { + + if (strcmp(config->role, "inter-router") == 0 || strcmp(config->role, "edge") == 0) { + pn_data_put_symbol(pn_connection_properties(conn), + pn_bytes(strlen(QD_CONNECTION_PROPERTY_ANNOTATIONS_VERSION_KEY), + QD_CONNECTION_PROPERTY_ANNOTATIONS_VERSION_KEY)); + pn_data_put_int(pn_connection_properties(conn), QD_ROUTER_ANNOTATIONS_VERSION); + } + qd_failover_list_t *fol = config->failover_list; if (fol) { pn_data_put_symbol(pn_connection_properties(conn), diff --git a/tests/system_tests_open_properties.py b/tests/system_tests_open_properties.py index 4214b79..3e9979d 100644 --- a/tests/system_tests_open_properties.py +++ b/tests/system_tests_open_properties.py @@ -381,5 +381,122 @@ class OpenPropertiesBadConfigTest(TestCase): self.assertTrue(self._find_in_output(router.outfile + '.out', err)) +def get_log_line(filename, pattern): + with open(filename, 'r') as out_file: + for line in out_file: + if pattern in line: + return line + return None + + +class OpenPropertiesInterRouterTest(TestCase): + """ + Verifies Open Properties passed between routers + """ + @classmethod + def setUpClass(cls): + """Start a router and a messenger""" + super(OpenPropertiesInterRouterTest, cls).setUpClass() + + ir_port = cls.tester.get_port() + cls.RouterA = cls.tester.qdrouterd("RouterA", + Qdrouterd.Config([ + ('router', {'mode': 'interior', + 'id': 'RouterA'}), + ('listener', {'port': + cls.tester.get_port()}), + ('listener', {'role': + 'inter-router', + 'port': + ir_port})]), + wait=False) + cls.RouterB = cls.tester.qdrouterd("RouterB", + Qdrouterd.Config([ + ('router', {'mode': 'interior', + 'id': 'RouterB'}), + ('listener', {'port': + cls.tester.get_port()}), + ('connector', {'role': + 'inter-router', + 'port': + ir_port})]), + wait=True) + cls.RouterA.wait_router_connected('RouterB') + cls.RouterB.wait_router_connected('RouterA') + + def test_01_check_annotations(self): + """ + Verify the router annotations version + """ + a_logfile = self.RouterA.logfile_path + b_logfile = self.RouterB.logfile_path + self.RouterA.teardown() + self.RouterB.teardown() + + log_msg = "ROUTER (debug) Remote router annotations version: 1" + line = get_log_line(a_logfile, log_msg) + self.assertIsNotNone(line) + + line = get_log_line(b_logfile, log_msg) + self.assertIsNotNone(line) + + +class OpenPropertiesEdgeRouterTest(TestCase): + """ + Verifies Open Properties passed between interior and edge routers + """ + @classmethod + def setUpClass(cls): + """Start a router and a messenger""" + super(OpenPropertiesEdgeRouterTest, cls).setUpClass() + + ir_port = cls.tester.get_port() + cls.RouterA = cls.tester.qdrouterd("RouterA", + Qdrouterd.Config([ + ('router', {'mode': 'interior', + 'id': 'RouterA'}), + ('listener', {'port': + cls.tester.get_port()}), + ('listener', {'role': + 'edge', + 'port': + ir_port})]), + wait=False) + cls.RouterB = cls.tester.qdrouterd("RouterB", + Qdrouterd.Config([ + ('router', {'mode': 'edge', + 'id': 'RouterB'}), + ('listener', {'port': + cls.tester.get_port()}), + ('connector', {'role': + 'edge', + 'port': + ir_port})]), + wait=True) + cls.RouterA.wait_ready() + mgmt = cls.RouterA.management + while True: + results = mgmt.query(type='org.apache.qpid.dispatch.connection', + attribute_names=['container']).get_dicts() + if len([c for c in results if c['container'] == 'RouterB']): + break + + def test_01_check_annotations(self): + """ + Verify the router annotations version + """ + a_logfile = self.RouterA.logfile_path + b_logfile = self.RouterB.logfile_path + self.RouterA.teardown() + self.RouterB.teardown() + + log_msg = "ROUTER (debug) Remote router annotations version: 1" + line = get_log_line(a_logfile, log_msg) + self.assertIsNotNone(line) + + line = get_log_line(b_logfile, log_msg) + self.assertIsNotNone(line) + + if __name__ == '__main__': unittest.main(main_module()) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
