This is an automated email from the ASF dual-hosted git repository. kgiusti pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/main by this push: new c2250cd DISPATCH-2190: add SSL tests for qdstat and qdmanage c2250cd is described below commit c2250cd68ea21a5e21210fbfd65557284e0c80af Author: Kenneth Giusti <kgiu...@apache.org> AuthorDate: Fri Jul 2 16:46:47 2021 -0400 DISPATCH-2190: add SSL tests for qdstat and qdmanage This closes #1282 --- tests/system_tests_qdmanage.py | 69 +++- tests/system_tests_qdstat.py | 778 ++++++++++++++++++++--------------------- 2 files changed, 448 insertions(+), 399 deletions(-) diff --git a/tests/system_tests_qdmanage.py b/tests/system_tests_qdmanage.py index 2b02ca9..bf7ba39 100644 --- a/tests/system_tests_qdmanage.py +++ b/tests/system_tests_qdmanage.py @@ -48,6 +48,8 @@ class QdmanageTest(TestCase): def setUpClass(cls): super(QdmanageTest, cls).setUpClass() cls.inter_router_port = cls.tester.get_port() + cls.secure_port = cls.tester.get_port() + cls.secure_user_port = cls.tester.get_port() config_1 = Qdrouterd.Config([ ('router', {'mode': 'interior', 'id': 'R1'}), ('sslProfile', {'name': 'server-ssl', @@ -62,7 +64,15 @@ class QdmanageTest(TestCase): ('linkRoute', {'name': 'test-link-route', 'prefix': 'xyz', 'direction': 'in'}), ('autoLink', {'name': 'test-auto-link', 'address': 'mnop', 'direction': 'out'}), ('listener', {'port': cls.tester.get_port(), 'sslProfile': 'server-ssl'}), - ('address', {'name': 'pattern-address', 'pattern': 'a/*/b/#/c', 'distribution': 'closest'}) + ('address', {'name': 'pattern-address', 'pattern': 'a/*/b/#/c', 'distribution': 'closest'}), + + # for testing SSL + ('listener', {'host': 'localhost', 'port': cls.secure_port, + 'sslProfile': 'server-ssl', 'requireSsl': 'yes'}), + ('listener', {'host': 'localhost', 'port': cls.secure_user_port, + 'sslProfile': 'server-ssl', 'requireSsl': 'yes', + 'authenticatePeer': 'yes', + 'saslMechanisms': 'EXTERNAL'}) ]) config_2 = Qdrouterd.Config([ @@ -86,7 +96,7 @@ class QdmanageTest(TestCase): try: p.teardown() except Exception as e: - raise Exception(out if out else str(e)) + raise sys.exc_info()[0](out if out else str(e)) return out def assert_entity_equal(self, expect, actual, copy=None): @@ -164,7 +174,7 @@ class QdmanageTest(TestCase): self.assertIn(t, qall_types) qlistener = json.loads(self.run_qdmanage('query --type=listener')) - self.assertEqual([long_type('listener')] * 2, [e['type'] for e in qlistener]) + self.assertEqual([long_type('listener')] * 4, [e['type'] for e in qlistener]) self.assertEqual(self.router_1.ports[0], int(qlistener[0]['port'])) qattr = json.loads(self.run_qdmanage('query type name')) @@ -612,6 +622,59 @@ class QdmanageTest(TestCase): # is added self.assertTrue(mem is None) + def test_ssl_connection(self): + """Verify qdmanage can securely connect via SSL""" + ssl_address = "amqps://localhost:%s" % self.secure_port + ssl_user_address = "amqps://localhost:%s" % self.secure_user_port + query = 'QUERY --type org.apache.qpid.dispatch.router' + + # this should fail: no trustfile + with self.assertRaises(RuntimeError, + msg="failure expected: no trustfile") as exc: + self.run_qdmanage(query, address=ssl_address) + self.assertIn("certificate verify failed", str(exc.exception), + "unexpected exception: %s" % str(exc.exception)) + + # this should pass: + self.run_qdmanage(query + " --ssl-trustfile " + + self.ssl_file('ca-certificate.pem'), + address=ssl_address) + + # this should fail: wrong hostname + with self.assertRaises(RuntimeError, + msg="failure expected: wrong hostname") as exc: + self.run_qdmanage(query + " --ssl-trustfile " + + self.ssl_file('ca-certificate.pem'), + address="amqps://127.0.0.1:%s" % self.secure_port) + self.assertIn("certificate verify failed", str(exc.exception), + "unexpected exception: %s" % str(exc.exception)) + + # this should pass: disable hostname check: + self.run_qdmanage(query + " --ssl-trustfile " + + self.ssl_file('ca-certificate.pem') + + " --ssl-disable-peer-name-verify", + address="amqps://127.0.0.1:%s" % self.secure_port) + + # this should fail: router requires client to authenticate + with self.assertRaises(RuntimeError, + msg="client authentication should fail") as exc: + self.run_qdmanage(query + " --ssl-trustfile " + + self.ssl_file('ca-certificate.pem'), + address=ssl_user_address) + self.assertIn("SSL Failure", str(exc.exception), + "unexpected exception: %s" % str(exc.exception)) + + # this should pass: qdmanage provides credentials + self.run_qdmanage(query + " --ssl-trustfile " + + self.ssl_file('ca-certificate.pem') + + " --ssl-certificate " + + self.ssl_file('client-certificate.pem') + + " --ssl-password-file " + + self.ssl_file('client-password-file.txt') + + " --ssl-key " + + self.ssl_file('client-private-key.pem'), + address=ssl_user_address) + if __name__ == '__main__': unittest.main(main_module()) diff --git a/tests/system_tests_qdstat.py b/tests/system_tests_qdstat.py index 9258c63..157622e 100644 --- a/tests/system_tests_qdstat.py +++ b/tests/system_tests_qdstat.py @@ -19,49 +19,58 @@ import os import re -import system_test import unittest from subprocess import PIPE from proton import Url, SSLDomain, SSLUnavailable, SASL -from system_test import main_module +from system_test import main_module, TIMEOUT, TestCase, Qdrouterd, DIR from proton.utils import BlockingConnection -class QdstatTest(system_test.TestCase): +class QdstatTestBase(TestCase): + """Define run_qdstat for use with all tests""" + + def run_qdstat(self, args, address=None, regex=None): + if args is None: + args = [] + args = ['qdstat', + '--bus', str(address or self.address()), + '--timeout', str(TIMEOUT)] + args + + p = self.popen(args, name='qdstat-' + self.id(), stdout=PIPE, + expect=None, universal_newlines=True) + out, err = p.communicate() + + if p.returncode != 0: + raise RuntimeError("qdstat failed: %s (%s, %s)" % (p.returncode, + out, err)) + if regex is not None: + pattern = re.compile(regex, re.I) + self.assertRegex(out, pattern) + + return out + + def address(self): + """Can be overridden by tests""" + return self.router.addresses[0] + + +class QdstatTest(QdstatTestBase): """Test qdstat tool output""" @classmethod def setUpClass(cls): super(QdstatTest, cls).setUpClass() - config = system_test.Qdrouterd.Config([ + config = Qdrouterd.Config([ ('router', {'id': 'QDR.A', 'workerThreads': 1}), ('listener', {'port': cls.tester.get_port()}), ]) cls.router = cls.tester.qdrouterd('test-router', config) - def run_qdstat(self, args, regexp=None, address=None): - if args: - popen_args = ['qdstat', '--bus', str(address or self.router.addresses[0]), '--timeout', str(system_test.TIMEOUT)] + args - else: - popen_args = ['qdstat', '--bus', - str(address or self.router.addresses[0]), - '--timeout', str(system_test.TIMEOUT)] - - p = self.popen(popen_args, - name='qdstat-' + self.id(), stdout=PIPE, expect=None, - universal_newlines=True) - - out = p.communicate()[0] - assert p.returncode == 0, \ - "qdstat exit status %s, output:\n%s" % (p.returncode, out) - if regexp: - assert re.search(regexp, out, re.I), "Can't find '%s' in '%s'" % (regexp, out) - return out - def test_help(self): - self.run_qdstat(['--help'], r'Usage: qdstat') + self.run_qdstat(['--help'], regex=r'Usage: qdstat') def test_general(self): - out = self.run_qdstat(['--general'], r'(?s)Router Statistics.*Mode\s*Standalone') + out = self.run_qdstat(['--general'], + regex=r'(?s)Router Statistics.*Mode\s*Standalone') self.assertTrue(re.match(r"(.*)\bConnections\b[ \t]+\b1\b(.*)", out, flags=re.DOTALL) is not None, out) @@ -81,7 +90,9 @@ class QdstatTest(system_test.TestCase): self.assertEqual(out.count("QDR.A"), 2) def test_general_csv(self): - out = self.run_qdstat(['--general', '--csv'], r'(?s)Router Statistics.*Mode","Standalone') + out = self.run_qdstat(['--general', '--csv'], + regex=r'(?s)Router Statistics.*Mode","Standalone') + self.assertIn('"Connections","1"', out) self.assertIn('"Worker Threads","1"', out) self.assertIn('"Nodes","0"', out) @@ -92,24 +103,24 @@ class QdstatTest(system_test.TestCase): self.assertEqual(out.count("QDR.A"), 2) def test_connections(self): - self.run_qdstat(['--connections'], r'host.*container.*role') - outs = self.run_qdstat(['--connections'], 'no-auth') - outs = self.run_qdstat(['--connections'], 'QDR.A') + self.run_qdstat(['--connections'], regex=r'host.*container.*role') + outs = self.run_qdstat(['--connections'], regex=r'no-auth') + outs = self.run_qdstat(['--connections'], regex=r'QDR.A') def test_connections_csv(self): - self.run_qdstat(['--connections', "--csv"], r'host.*container.*role') - outs = self.run_qdstat(['--connections'], 'no-auth') - outs = self.run_qdstat(['--connections'], 'QDR.A') + self.run_qdstat(['--connections', "--csv"], regex=r'host.*container.*role') + outs = self.run_qdstat(['--connections'], regex=r'no-auth') + outs = self.run_qdstat(['--connections'], regex=r'QDR.A') def test_links(self): - self.run_qdstat(['--links'], r'QDR.A') - out = self.run_qdstat(['--links'], r'endpoint.*out.*local.*temp.') + self.run_qdstat(['--links'], regex=r'QDR.A') + out = self.run_qdstat(['--links'], regex=r'endpoint.*out.*local.*temp.') parts = out.split("\n") self.assertEqual(len(parts), 9) def test_links_csv(self): - self.run_qdstat(['--links', "--csv"], r'QDR.A') - out = self.run_qdstat(['--links'], r'endpoint.*out.*local.*temp.') + self.run_qdstat(['--links', "--csv"], regex=r'QDR.A') + out = self.run_qdstat(['--links'], regex=r'endpoint.*out.*local.*temp.') parts = out.split("\n") self.assertEqual(len(parts), 9) @@ -124,20 +135,20 @@ class QdstatTest(system_test.TestCase): self.assertEqual(len(parts), 7) def test_nodes(self): - self.run_qdstat(['--nodes'], r'No Router List') + self.run_qdstat(['--nodes'], regex=r'No Router List') def test_nodes_csv(self): - self.run_qdstat(['--nodes', "--csv"], r'No Router List') + self.run_qdstat(['--nodes', "--csv"], regex=r'No Router List') def test_address(self): - out = self.run_qdstat(['--address'], r'QDR.A') - out = self.run_qdstat(['--address'], r'\$management') + out = self.run_qdstat(['--address'], regex=r'QDR.A') + out = self.run_qdstat(['--address'], regex=r'\$management') parts = out.split("\n") self.assertEqual(len(parts), 11) def test_address_csv(self): - out = self.run_qdstat(['--address'], r'QDR.A') - out = self.run_qdstat(['--address'], r'\$management') + out = self.run_qdstat(['--address'], regex=r'QDR.A') + out = self.run_qdstat(['--address'], regex=r'\$management') parts = out.split("\n") self.assertEqual(len(parts), 11) @@ -261,7 +272,7 @@ class QdstatTest(system_test.TestCase): self.assertIn("Total Denials", out) def test_log(self): - self.run_qdstat(['--log', '--limit=5'], r'AGENT \(debug\).*GET-LOG') + self.run_qdstat(['--log', '--limit=5'], regex=r'AGENT \(debug\).*GET-LOG') def test_yy_query_many_links(self): # This test will fail without the fix for DISPATCH-974 @@ -450,12 +461,12 @@ class QdstatTest(system_test.TestCase): c.close() -class QdstatTestVhostPolicy(system_test.TestCase): +class QdstatTestVhostPolicy(QdstatTestBase): """Test qdstat-with-policy tool output""" @classmethod def setUpClass(cls): super(QdstatTestVhostPolicy, cls).setUpClass() - config = system_test.Qdrouterd.Config([ + config = Qdrouterd.Config([ ('router', {'id': 'QDR.A', 'workerThreads': 1}), ('listener', {'port': cls.tester.get_port()}), ('policy', {'maxConnections': 100, 'enableVhostPolicy': 'true'}), @@ -483,25 +494,6 @@ class QdstatTestVhostPolicy(system_test.TestCase): ]) cls.router = cls.tester.qdrouterd('test-router', config) - def run_qdstat(self, args, regexp=None, address=None): - if args: - popen_args = ['qdstat', '--bus', str(address or self.router.addresses[0]), '--timeout', str(system_test.TIMEOUT)] + args - else: - popen_args = ['qdstat', '--bus', - str(address or self.router.addresses[0]), - '--timeout', str(system_test.TIMEOUT)] - - p = self.popen(popen_args, - name='qdstat-' + self.id(), stdout=PIPE, expect=None, - universal_newlines=True) - - out = p.communicate()[0] - assert p.returncode == 0, \ - "qdstat exit status %s, output:\n%s" % (p.returncode, out) - if regexp: - assert re.search(regexp, out, re.I), "Can't find '%s' in '%s'" % (regexp, out) - return out - def test_vhost(self): out = self.run_qdstat(['--vhosts']) self.assertIn("Vhosts", out) @@ -541,19 +533,19 @@ class QdstatTestVhostPolicy(system_test.TestCase): self.assertIn("remote hosts", out) -class QdstatLinkPriorityTest(system_test.TestCase): +class QdstatLinkPriorityTest(QdstatTestBase): """Need 2 routers to get inter-router links for the link priority test""" @classmethod def setUpClass(cls): super(QdstatLinkPriorityTest, cls).setUpClass() cls.inter_router_port = cls.tester.get_port() - config_1 = system_test.Qdrouterd.Config([ + config_1 = Qdrouterd.Config([ ('router', {'mode': 'interior', 'id': 'R1'}), ('listener', {'port': cls.tester.get_port()}), ('connector', {'role': 'inter-router', 'port': cls.inter_router_port}) ]) - config_2 = system_test.Qdrouterd.Config([ + config_2 = Qdrouterd.Config([ ('router', {'mode': 'interior', 'id': 'R2'}), ('listener', {'role': 'inter-router', 'port': cls.inter_router_port}), ]) @@ -565,17 +557,6 @@ class QdstatLinkPriorityTest(system_test.TestCase): def address(self): return self.router_1.addresses[0] - def run_qdstat(self, args): - p = self.popen( - ['qdstat', '--bus', str(self.address()), '--timeout', str(system_test.TIMEOUT)] + args, - name='qdstat-' + self.id(), stdout=PIPE, expect=None, - universal_newlines=True) - - out = p.communicate()[0] - assert p.returncode == 0, \ - "qdstat exit status %s, output:\n%s" % (p.returncode, out) - return out - def test_link_priority(self): out = self.run_qdstat(['--links']) lines = out.split("\n") @@ -691,327 +672,332 @@ class QdstatLinkPriorityTest(system_test.TestCase): self._test_all_entities_all_routers(['--all-entities', '--csv', '--all-routers']) -try: - SSLDomain(SSLDomain.MODE_CLIENT) - - class QdstatSslTest(system_test.TestCase): - """Test qdstat tool output""" - - @staticmethod - def ssl_file(name): - return os.path.join(system_test.DIR, 'ssl_certs', name) - - @staticmethod - def sasl_path(): - return os.path.join(system_test.DIR, 'sasl_configs') - - @classmethod - def setUpClass(cls): - super(QdstatSslTest, cls).setUpClass() - # Write SASL configuration file: - with open('tests-mech-EXTERNAL.conf', 'w') as sasl_conf: - sasl_conf.write("mech_list: EXTERNAL ANONYMOUS DIGEST-MD5 PLAIN\n") - # qdrouterd configuration: - config = system_test.Qdrouterd.Config([ - ('router', {'id': 'QDR.B', - 'saslConfigPath': os.getcwd(), - 'workerThreads': 1, - 'saslConfigName': 'tests-mech-EXTERNAL'}), - ('sslProfile', {'name': 'server-ssl', - 'caCertFile': cls.ssl_file('ca-certificate.pem'), - 'certFile': cls.ssl_file('server-certificate.pem'), - 'privateKeyFile': cls.ssl_file('server-private-key.pem'), - 'password': 'server-password'}), - ('listener', {'host': 'localhost', 'port': cls.tester.get_port()}), - ('listener', {'host': 'localhost', 'port': cls.tester.get_port(), 'sslProfile': 'server-ssl', - 'authenticatePeer': 'no', 'requireSsl': 'yes'}), - ('listener', {'host': 'localhost', 'port': cls.tester.get_port(), 'sslProfile': 'server-ssl', - 'authenticatePeer': 'no', 'requireSsl': 'no'}), - ('listener', {'host': 'localhost', 'port': cls.tester.get_port(), 'sslProfile': 'server-ssl', - 'authenticatePeer': 'yes', 'requireSsl': 'yes', - 'saslMechanisms': 'EXTERNAL'}) - ]) - cls.router = cls.tester.qdrouterd('test-router', config) - - def run_qdstat(self, args, regexp=None, address=None): - p = self.popen( - ['qdstat', '--bus', str(address or self.router.addresses[0]), - '--timeout', str(system_test.TIMEOUT)] + args, - name='qdstat-' + self.id(), stdout=PIPE, expect=None, - universal_newlines=True) - - out = p.communicate()[0] - assert p.returncode == 0, \ - "qdstat exit status %s, output:\n%s" % (p.returncode, out) - if regexp: - assert re.search(regexp, out, re.I), "Can't find '%s' in '%s'" % (regexp, out) - return out - - def get_ssl_args(self): - args = dict( - sasl_external=['--sasl-mechanisms', 'EXTERNAL'], - trustfile=['--ssl-trustfile', self.ssl_file('ca-certificate.pem')], - bad_trustfile=['--ssl-trustfile', self.ssl_file('bad-ca-certificate.pem')], - client_cert=['--ssl-certificate', self.ssl_file('client-certificate.pem')], - client_key=['--ssl-key', self.ssl_file('client-private-key.pem')], - client_pass=['--ssl-password', 'client-password']) - args['client_cert_all'] = args['client_cert'] + args['client_key'] + args['client_pass'] - - return args - - def ssl_test(self, url_name, arg_names): - """Run simple SSL connection test with supplied parameters. - See test_ssl_* below. - """ - args = self.get_ssl_args() - addrs = [self.router.addresses[i] for i in range(4)] - urls = dict(zip(['none', 'strict', 'unsecured', 'auth'], addrs)) - urls.update(zip(['none_s', 'strict_s', 'unsecured_s', 'auth_s'], - (Url(a, scheme="amqps") for a in addrs))) - self.run_qdstat(['--general'] + sum([args[n] for n in arg_names], []), - regexp=r'(?s)Router Statistics.*Mode\s*Standalone', - address=str(urls[url_name])) - - def ssl_test_bad(self, url_name, arg_names): - self.assertRaises(AssertionError, self.ssl_test, url_name, arg_names) - - # qdstat -b amqp://localhost:<port> --general and makes sure - # the router sends back a valid response. - def test_ssl_none(self): - self.ssl_test('none', []) - - # qdstat -b amqps://localhost:<port> --general - # Make sure that the command fails. - def test_ssl_scheme_to_none(self): - self.ssl_test_bad('none_s', []) - - # qdstat -b amqp://localhost:<port> --general --ssl-certificate /path/to/client-certificate.pem - # Makes sure the command fails. - def test_ssl_cert_to_none(self): - self.ssl_test_bad('none', ['client_cert']) - - # Tries to run the following command on a listener that requires SSL (requireSsl:yes) - # qdstat -b amqp://localhost:<port> --general - # Makes sure the command fails. - def test_ssl_none_to_strict(self): - self.ssl_test_bad('strict', []) - - # qdstat -b amqps://localhost:<port> --general - def test_ssl_schema_to_strict(self): - self.ssl_test_bad('strict_s', []) - - # qdstat -b amqps://localhost:<port> --general --ssl-certificate /path/to/client-certificate.pem - # --ssl-key /path/to/client-private-key.pem --ssl-password client-password' - def test_ssl_cert_to_strict(self): - self.ssl_test_bad('strict_s', ['client_cert_all']) - - # qdstat -b amqps://localhost:<port> --general --ssl-trustfile /path/to/ca-certificate.pem - def test_ssl_trustfile_to_strict(self): - self.ssl_test('strict_s', ['trustfile']) - - # qdstat -b amqps://localhost:<port> --general --ssl-trustfile - # /path/to/ca-certificate.pem --ssl-certificate /path/to/client-certificate.pem - # --ssl-key /path/to/client-private-key.pem --ssl-password client-password - def test_ssl_trustfile_cert_to_strict(self): - self.ssl_test('strict_s', ['trustfile', 'client_cert_all']) - - # qdstat -b amqps://localhost:<port> --general --ssl-trustfile /path/to/bad-ca-certificate.pem - # Send in a bad ca cert and make sure the test fails. - def test_ssl_bad_trustfile_to_strict(self): - self.ssl_test_bad('strict_s', ['bad_trustfile']) - - # Require-auth SSL listener - # qdstat -b amqp://localhost:<port> --general - # Send in no certs to a 'authenticatePeer': 'yes', 'requireSsl': 'yes' listener and make sure it fails. - # Also protocol is amqp not amqps - def test_ssl_none_to_auth(self): - self.ssl_test_bad('auth', []) - - # qdstat -b amqps://localhost:28491 --general - # Send in no certs to a 'authenticatePeer': 'yes', 'requireSsl': 'yes' listener and make sure it fails. - def test_ssl_schema_to_auth(self): - self.ssl_test_bad('auth_s', []) - - # qdstat -b amqps://localhost:<port> --general --ssl-trustfile /path/to/ca-certificate.pem' - # Send in just a trustfile to an 'authenticatePeer': 'yes', 'requireSsl': 'yes' listener and make sure it fails. - def test_ssl_trustfile_to_auth(self): - self.ssl_test_bad('auth_s', ['trustfile']) - - # qdstat -b amqps://localhost:<port> --general --ssl-certificate /path/to/client-certificate.pem - # --ssl-key /path/to/client-private-key.pem --ssl-password client-password - # Without a trustfile, this test fails - def test_ssl_cert_to_auth(self): - self.ssl_test_bad('auth_s', ['client_cert_all']) - - # qdstat -b amqps://localhost:<port> --general --ssl-trustfile /path/to/ca-certificate.pem - # --ssl-certificate /path/to/client-certificate.pem - # --ssl-key /path/to/client-private-key.pem --ssl-password client-password - # This has everything, the test should pass. - def test_ssl_trustfile_cert_to_auth(self): - self.ssl_test('auth_s', ['trustfile', 'client_cert_all']) - - # qdstat -b amqps://localhost:<port> --general --ssl-trustfile /path/to/bad-ca-certificate.pem - # --ssl-certificate /path/to/client-certificate.pem --ssl-key /path/to/client-private-key.pem - # --ssl-password client-password - # Bad trustfile should be rejected. - def test_ssl_bad_trustfile_to_auth(self): - self.ssl_test_bad('auth_s', ['bad_trustfile', 'client_cert_all']) - - # qdstat -b amqps://localhost:<port> --general --sasl-mechanisms EXTERNAL - # --ssl-certificate /path/to/client-certificate.pem --ssl-key /path/to/client-private-key.pem - # --ssl-password client-password --ssl-trustfile /path/to/ca-certificate.pem' - def test_ssl_cert_explicit_external_to_auth(self): - self.ssl_test('auth_s', ['sasl_external', 'client_cert_all', 'trustfile']) - - # Unsecured SSL listener, allows non-SSL - # qdstat -b amqp://localhost:<port> --general - def test_ssl_none_to_unsecured(self): - self.ssl_test('unsecured', []) - - # qdstat -b amqps://localhost:<port> --general - def test_ssl_schema_to_unsecured(self): - self.ssl_test_bad('unsecured_s', []) - - # qdstat -b amqps://localhost:<port> --general --ssl-certificate /path/to/client-certificate.pem --ssl-key - # /path/to/client-private-key.pem --ssl-password client-password - # A trustfile is required, test will fail - def test_ssl_cert_to_unsecured(self): - self.ssl_test_bad('unsecured_s', ['client_cert_all']) - - # qdstat -b amqps://localhost:<port> --general --ssl-trustfile /path/to/ca-certificate.pem' - # Just send in the trustfile, should be all good. - def test_ssl_trustfile_to_unsecured(self): - self.ssl_test('unsecured_s', ['trustfile']) - - # qdstat -b amqps://localhost:<port> --general --ssl-trustfile /path/to/ca-certificate.pem - # --ssl-certificate /path/to/client-certificate.pem --ssl-key /path/to/client-private-key.pem - # --ssl-password client-password - # We have everything, this should work. - def test_ssl_trustfile_cert_to_unsecured(self): - self.ssl_test('unsecured_s', ['trustfile', 'client_cert_all']) - - # qdstat -b amqps://localhost:<port> --general --ssl-trustfile /path/to/bad-ca-certificate.pem'] - # Bad trustfile, test will fail. - def test_ssl_bad_trustfile_to_unsecured(self): - self.ssl_test_bad('unsecured_s', ['bad_trustfile']) - -except SSLUnavailable: - class QdstatSslTest(system_test.TestCase): - def test_skip(self): - self.skipTest("Proton SSL support unavailable.") - -try: - SSLDomain(SSLDomain.MODE_CLIENT) - - class QdstatSslTestSslPasswordFile(QdstatSslTest): +def _has_ssl(): + try: + SSLDomain(SSLDomain.MODE_CLIENT) + return True + except SSLUnavailable: + return False + + +@unittest.skipIf(_has_ssl() is False, "Proton SSL support unavailable") +class QdstatSslTest(QdstatTestBase): + """Test qdstat tool output""" + + @staticmethod + def ssl_file(name): + return os.path.join(DIR, 'ssl_certs', name) + + @staticmethod + def sasl_path(): + return os.path.join(DIR, 'sasl_configs') + + @classmethod + def setUpClass(cls): + super(QdstatSslTest, cls).setUpClass() + # Write SASL configuration file: + with open('tests-mech-EXTERNAL.conf', 'w') as sasl_conf: + sasl_conf.write("mech_list: EXTERNAL ANONYMOUS DIGEST-MD5 PLAIN\n") + # qdrouterd configuration. Note well: modifying the order of + # listeners will require updating ssl_test() + cls.strict_port = cls.tester.get_port() + config = Qdrouterd.Config([ + ('router', {'id': 'QDR.B', + 'saslConfigPath': os.getcwd(), + 'workerThreads': 1, + 'saslConfigName': 'tests-mech-EXTERNAL'}), + ('sslProfile', {'name': 'server-ssl', + 'caCertFile': cls.ssl_file('ca-certificate.pem'), + 'certFile': cls.ssl_file('server-certificate.pem'), + 'privateKeyFile': cls.ssl_file('server-private-key.pem'), + 'password': 'server-password'}), + + # 'none', 'none_s': + ('listener', {'host': 'localhost', 'port': cls.tester.get_port()}), + + # 'strict', 'strict_s': + ('listener', {'host': 'localhost', 'port': cls.strict_port, 'sslProfile': 'server-ssl', + 'authenticatePeer': 'no', 'requireSsl': 'yes'}), + + # 'unsecured', 'unsecured_s': + ('listener', {'host': 'localhost', 'port': cls.tester.get_port(), 'sslProfile': 'server-ssl', + 'authenticatePeer': 'no', 'requireSsl': 'no'}), + + # 'auth', 'auth_s': + ('listener', {'host': 'localhost', 'port': cls.tester.get_port(), 'sslProfile': 'server-ssl', + 'authenticatePeer': 'yes', 'requireSsl': 'yes', + 'saslMechanisms': 'EXTERNAL'}) + ]) + cls.router = cls.tester.qdrouterd('test-router', config) + + def get_ssl_args(self): + """A map of short names to the corresponding qdstat arguments. A list + of keys are passed to ssl_test() via the arg_names parameter list. """ - Tests the --ssl-password-file command line parameter + args = dict( + sasl_external=['--sasl-mechanisms', 'EXTERNAL'], + trustfile=['--ssl-trustfile', self.ssl_file('ca-certificate.pem')], + bad_trustfile=['--ssl-trustfile', self.ssl_file('bad-ca-certificate.pem')], + client_cert=['--ssl-certificate', self.ssl_file('client-certificate.pem')], + client_key=['--ssl-key', self.ssl_file('client-private-key.pem')], + client_pass=['--ssl-password', 'client-password']) + args['client_cert_all'] = args['client_cert'] + args['client_key'] + args['client_pass'] + + return args + + def ssl_test(self, url_name, arg_names): + """Run simple SSL connection test with supplied parameters. + + :param url_name: a shorthand name used to select which router + interface qdstat will connect to: + 'none' = No SSL or SASL config + 'strict' = Require SSL, No SASL config + 'unsecured' = SSL not required, SASL not required + 'auth' = Require SASL (external) and SSL + + The '*_s' names simply replace 'amqp:' with 'amqps:' in the + generated URL. + + See test_ssl_* below. """ + args = self.get_ssl_args() + addrs = [self.router.addresses[i] for i in range(4)] + urls = dict(zip(['none', 'strict', 'unsecured', 'auth'], addrs)) + urls.update(zip(['none_s', 'strict_s', 'unsecured_s', 'auth_s'], + (Url(a, scheme="amqps") for a in addrs))) + self.run_qdstat(['--general'] + sum([args[n] for n in arg_names], []), + address=str(urls[url_name]), + regex=r'(?s)Router Statistics.*Mode\s*Standalone') + + def ssl_test_bad(self, url_name, arg_names): + self.assertRaises(RuntimeError, self.ssl_test, url_name, arg_names) + + # qdstat -b amqp://localhost:<port> --general and makes sure + # the router sends back a valid response. + def test_ssl_none(self): + self.ssl_test('none', []) + + # qdstat -b amqps://localhost:<port> --general + # Make sure that the command fails. + def test_ssl_scheme_to_none(self): + self.ssl_test_bad('none_s', []) + + # qdstat -b amqp://localhost:<port> --general --ssl-certificate /path/to/client-certificate.pem + # Makes sure the command fails. + def test_ssl_cert_to_none(self): + self.ssl_test_bad('none', ['client_cert']) + + # Tries to run the following command on a listener that requires SSL (requireSsl:yes) + # qdstat -b amqp://localhost:<port> --general + # Makes sure the command fails. + def test_ssl_none_to_strict(self): + self.ssl_test_bad('strict', []) + + # qdstat -b amqps://localhost:<port> --general + def test_ssl_schema_to_strict(self): + self.ssl_test_bad('strict_s', []) + + # qdstat -b amqps://localhost:<port> --general --ssl-certificate /path/to/client-certificate.pem + # --ssl-key /path/to/client-private-key.pem --ssl-password client-password' + def test_ssl_cert_to_strict(self): + self.ssl_test_bad('strict_s', ['client_cert_all']) + + # qdstat -b amqps://localhost:<port> --general --ssl-trustfile /path/to/ca-certificate.pem + def test_ssl_trustfile_to_strict(self): + self.ssl_test('strict_s', ['trustfile']) + + # qdstat -b amqps://localhost:<port> --general --ssl-trustfile + # /path/to/ca-certificate.pem --ssl-certificate /path/to/client-certificate.pem + # --ssl-key /path/to/client-private-key.pem --ssl-password client-password + def test_ssl_trustfile_cert_to_strict(self): + self.ssl_test('strict_s', ['trustfile', 'client_cert_all']) + + # qdstat -b amqps://localhost:<port> --general --ssl-trustfile /path/to/bad-ca-certificate.pem + # Send in a bad ca cert and make sure the test fails. + def test_ssl_bad_trustfile_to_strict(self): + self.ssl_test_bad('strict_s', ['bad_trustfile']) + + # Require-auth SSL listener + # qdstat -b amqp://localhost:<port> --general + # Send in no certs to a 'authenticatePeer': 'yes', 'requireSsl': 'yes' listener and make sure it fails. + # Also protocol is amqp not amqps + def test_ssl_none_to_auth(self): + self.ssl_test_bad('auth', []) + + # qdstat -b amqps://localhost:28491 --general + # Send in no certs to a 'authenticatePeer': 'yes', 'requireSsl': 'yes' listener and make sure it fails. + def test_ssl_schema_to_auth(self): + self.ssl_test_bad('auth_s', []) + + # qdstat -b amqps://localhost:<port> --general --ssl-trustfile /path/to/ca-certificate.pem' + # Send in just a trustfile to an 'authenticatePeer': 'yes', 'requireSsl': 'yes' listener and make sure it fails. + def test_ssl_trustfile_to_auth(self): + self.ssl_test_bad('auth_s', ['trustfile']) + + # qdstat -b amqps://localhost:<port> --general --ssl-certificate /path/to/client-certificate.pem + # --ssl-key /path/to/client-private-key.pem --ssl-password client-password + # Without a trustfile, this test fails + def test_ssl_cert_to_auth(self): + self.ssl_test_bad('auth_s', ['client_cert_all']) + + # qdstat -b amqps://localhost:<port> --general --ssl-trustfile /path/to/ca-certificate.pem + # --ssl-certificate /path/to/client-certificate.pem + # --ssl-key /path/to/client-private-key.pem --ssl-password client-password + # This has everything, the test should pass. + def test_ssl_trustfile_cert_to_auth(self): + self.ssl_test('auth_s', ['trustfile', 'client_cert_all']) + + # qdstat -b amqps://localhost:<port> --general --ssl-trustfile /path/to/bad-ca-certificate.pem + # --ssl-certificate /path/to/client-certificate.pem --ssl-key /path/to/client-private-key.pem + # --ssl-password client-password + # Bad trustfile should be rejected. + def test_ssl_bad_trustfile_to_auth(self): + self.ssl_test_bad('auth_s', ['bad_trustfile', 'client_cert_all']) + + # qdstat -b amqps://localhost:<port> --general --sasl-mechanisms EXTERNAL + # --ssl-certificate /path/to/client-certificate.pem --ssl-key /path/to/client-private-key.pem + # --ssl-password client-password --ssl-trustfile /path/to/ca-certificate.pem' + def test_ssl_cert_explicit_external_to_auth(self): + self.ssl_test('auth_s', ['sasl_external', 'client_cert_all', 'trustfile']) + + # Unsecured SSL listener, allows non-SSL + # qdstat -b amqp://localhost:<port> --general + def test_ssl_none_to_unsecured(self): + self.ssl_test('unsecured', []) + + # qdstat -b amqps://localhost:<port> --general + def test_ssl_schema_to_unsecured(self): + self.ssl_test_bad('unsecured_s', []) + + # qdstat -b amqps://localhost:<port> --general --ssl-certificate /path/to/client-certificate.pem --ssl-key + # /path/to/client-private-key.pem --ssl-password client-password + # A trustfile is required, test will fail + def test_ssl_cert_to_unsecured(self): + self.ssl_test_bad('unsecured_s', ['client_cert_all']) + + # qdstat -b amqps://localhost:<port> --general --ssl-trustfile /path/to/ca-certificate.pem' + # Just send in the trustfile, should be all good. + def test_ssl_trustfile_to_unsecured(self): + self.ssl_test('unsecured_s', ['trustfile']) + + # qdstat -b amqps://localhost:<port> --general --ssl-trustfile /path/to/ca-certificate.pem + # --ssl-certificate /path/to/client-certificate.pem --ssl-key /path/to/client-private-key.pem + # --ssl-password client-password + # We have everything, this should work. + def test_ssl_trustfile_cert_to_unsecured(self): + self.ssl_test('unsecured_s', ['trustfile', 'client_cert_all']) + + # qdstat -b amqps://localhost:<port> --general --ssl-trustfile /path/to/bad-ca-certificate.pem'] + # Bad trustfile, test will fail. + def test_ssl_bad_trustfile_to_unsecured(self): + self.ssl_test_bad('unsecured_s', ['bad_trustfile']) + + def test_ssl_peer_name_verify_disabled(self): + """Verify the --ssl-disable-peer-name-verify option""" + params = [x for x in self.get_ssl_args()['trustfile']] + + # Expect the connection to fail, since the certificate has + # 'localhost' as the peer name and we used '127.0.0.1' instead. + + with self.assertRaises(RuntimeError, + msg="expected fail: host name wrong") as exc: + self.run_qdstat(address="amqps://127.0.0.1:%s" % self.strict_port, + args=['--general'] + params) + + # repeat the same operation but using + # --ssl-disable--peer-name-verify. This should succeed: + + self.run_qdstat(address="amqps://127.0.0.1:%s" % self.strict_port, + args=['--general', + '--ssl-disable-peer-name-verify'] + params) + + +@unittest.skipIf(_has_ssl() is False, "Proton SSL support unavailable") +class QdstatSslTestSslPasswordFile(QdstatSslTest): + """ + Tests the --ssl-password-file command line parameter + """ + + def get_ssl_args(self): + args = dict( + sasl_external=['--sasl-mechanisms', 'EXTERNAL'], + trustfile=['--ssl-trustfile', self.ssl_file('ca-certificate.pem')], + bad_trustfile=['--ssl-trustfile', self.ssl_file('bad-ca-certificate.pem')], + client_cert=['--ssl-certificate', self.ssl_file('client-certificate.pem')], + client_key=['--ssl-key', self.ssl_file('client-private-key.pem')], + client_pass=['--ssl-password-file', self.ssl_file('client-password-file.txt')]) + args['client_cert_all'] = args['client_cert'] + args['client_key'] + args['client_pass'] + + return args + + +@unittest.skipIf(_has_ssl() is False, "Proton SSL support unavailable") +class QdstatSslNoExternalTest(QdstatTestBase): + """Test qdstat can't connect without sasl_mech EXTERNAL""" + + @staticmethod + def ssl_file(name): + return os.path.join(DIR, 'ssl_certs', name) + + @staticmethod + def sasl_path(): + return os.path.join(DIR, 'sasl_configs') + + @classmethod + def setUpClass(cls): + super(QdstatSslNoExternalTest, cls).setUpClass() + # Write SASL configuration file: + with open('tests-mech-NOEXTERNAL.conf', 'w') as sasl_conf: + sasl_conf.write("mech_list: ANONYMOUS DIGEST-MD5 PLAIN\n") + # qdrouterd configuration: + config = Qdrouterd.Config([ + ('router', {'id': 'QDR.C', + 'saslConfigPath': os.getcwd(), + 'workerThreads': 1, + 'saslConfigName': 'tests-mech-NOEXTERNAL'}), + ('sslProfile', {'name': 'server-ssl', + 'caCertFile': cls.ssl_file('ca-certificate.pem'), + 'certFile': cls.ssl_file('server-certificate.pem'), + 'privateKeyFile': cls.ssl_file('server-private-key.pem'), + 'password': 'server-password'}), + ('listener', {'port': cls.tester.get_port()}), + ('listener', {'port': cls.tester.get_port(), 'sslProfile': 'server-ssl', 'authenticatePeer': 'no', 'requireSsl': 'yes'}), + ('listener', {'port': cls.tester.get_port(), 'sslProfile': 'server-ssl', 'authenticatePeer': 'no', 'requireSsl': 'no'}), + ('listener', {'port': cls.tester.get_port(), 'sslProfile': 'server-ssl', 'authenticatePeer': 'yes', 'requireSsl': 'yes', + 'saslMechanisms': 'EXTERNAL'}) + ]) + cls.router = cls.tester.qdrouterd('test-router', config) - def get_ssl_args(self): - args = dict( - sasl_external=['--sasl-mechanisms', 'EXTERNAL'], - trustfile=['--ssl-trustfile', self.ssl_file('ca-certificate.pem')], - bad_trustfile=['--ssl-trustfile', self.ssl_file('bad-ca-certificate.pem')], - client_cert=['--ssl-certificate', self.ssl_file('client-certificate.pem')], - client_key=['--ssl-key', self.ssl_file('client-private-key.pem')], - client_pass=['--ssl-password-file', self.ssl_file('client-password-file.txt')]) - args['client_cert_all'] = args['client_cert'] + args['client_key'] + args['client_pass'] - - return args - -except SSLUnavailable: - class QdstatSslTest(system_test.TestCase): - def test_skip(self): - self.skipTest("Proton SSL support unavailable.") - -try: - SSLDomain(SSLDomain.MODE_CLIENT) - - class QdstatSslNoExternalTest(system_test.TestCase): - """Test qdstat can't connect without sasl_mech EXTERNAL""" - - @staticmethod - def ssl_file(name): - return os.path.join(system_test.DIR, 'ssl_certs', name) - - @staticmethod - def sasl_path(): - return os.path.join(system_test.DIR, 'sasl_configs') - - @classmethod - def setUpClass(cls): - super(QdstatSslNoExternalTest, cls).setUpClass() - # Write SASL configuration file: - with open('tests-mech-NOEXTERNAL.conf', 'w') as sasl_conf: - sasl_conf.write("mech_list: ANONYMOUS DIGEST-MD5 PLAIN\n") - # qdrouterd configuration: - config = system_test.Qdrouterd.Config([ - ('router', {'id': 'QDR.C', - 'saslConfigPath': os.getcwd(), - 'workerThreads': 1, - 'saslConfigName': 'tests-mech-NOEXTERNAL'}), - ('sslProfile', {'name': 'server-ssl', - 'caCertFile': cls.ssl_file('ca-certificate.pem'), - 'certFile': cls.ssl_file('server-certificate.pem'), - 'privateKeyFile': cls.ssl_file('server-private-key.pem'), - 'password': 'server-password'}), - ('listener', {'port': cls.tester.get_port()}), - ('listener', {'port': cls.tester.get_port(), 'sslProfile': 'server-ssl', 'authenticatePeer': 'no', 'requireSsl': 'yes'}), - ('listener', {'port': cls.tester.get_port(), 'sslProfile': 'server-ssl', 'authenticatePeer': 'no', 'requireSsl': 'no'}), - ('listener', {'port': cls.tester.get_port(), 'sslProfile': 'server-ssl', 'authenticatePeer': 'yes', 'requireSsl': 'yes', - 'saslMechanisms': 'EXTERNAL'}) - ]) - cls.router = cls.tester.qdrouterd('test-router', config) - - def run_qdstat(self, args, regexp=None, address=None): - p = self.popen( - ['qdstat', '--bus', str(address or self.router.addresses[0]), '--timeout', str(system_test.TIMEOUT)] + args, - name='qdstat-' + self.id(), stdout=PIPE, expect=None, - universal_newlines=True) - out = p.communicate()[0] - assert p.returncode == 0, \ - "qdstat exit status %s, output:\n%s" % (p.returncode, out) - if regexp: - assert re.search(regexp, out, re.I), "Can't find '%s' in '%s'" % (regexp, out) - return out - - def ssl_test(self, url_name, arg_names): - """Run simple SSL connection test with supplied parameters. - See test_ssl_* below. - """ - args = dict( - trustfile=['--ssl-trustfile', self.ssl_file('ca-certificate.pem')], - bad_trustfile=['--ssl-trustfile', self.ssl_file('bad-ca-certificate.pem')], - client_cert=['--ssl-certificate', self.ssl_file('client-certificate.pem')], - client_key=['--ssl-key', self.ssl_file('client-private-key.pem')], - client_pass=['--ssl-password', 'client-password']) - args['client_cert_all'] = args['client_cert'] + args['client_key'] + args['client_pass'] - - addrs = [self.router.addresses[i] for i in range(4)] - urls = dict(zip(['none', 'strict', 'unsecured', 'auth'], addrs)) - urls.update(zip(['none_s', 'strict_s', 'unsecured_s', 'auth_s'], - (Url(a, scheme="amqps") for a in addrs))) - - self.run_qdstat(['--general'] + sum([args[n] for n in arg_names], []), - regexp=r'(?s)Router Statistics.*Mode\s*Standalone', - address=str(urls[url_name])) - - def ssl_test_bad(self, url_name, arg_names): - self.assertRaises(AssertionError, self.ssl_test, url_name, arg_names) - - @unittest.skipIf(not SASL.extended(), "Cyrus library not available. skipping test") - def test_ssl_cert_to_auth_fail_no_sasl_external(self): - self.ssl_test_bad('auth_s', ['client_cert_all']) - - def test_ssl_trustfile_cert_to_auth_fail_no_sasl_external(self): - self.ssl_test_bad('auth_s', ['trustfile', 'client_cert_all']) - - -except SSLUnavailable: - class QdstatSslTest(system_test.TestCase): - def test_skip(self): - self.skipTest("Proton SSL support unavailable.") + def ssl_test(self, url_name, arg_names): + """Run simple SSL connection test with supplied parameters. + See test_ssl_* below. + """ + args = dict( + trustfile=['--ssl-trustfile', self.ssl_file('ca-certificate.pem')], + bad_trustfile=['--ssl-trustfile', self.ssl_file('bad-ca-certificate.pem')], + client_cert=['--ssl-certificate', self.ssl_file('client-certificate.pem')], + client_key=['--ssl-key', self.ssl_file('client-private-key.pem')], + client_pass=['--ssl-password', 'client-password']) + args['client_cert_all'] = args['client_cert'] + args['client_key'] + args['client_pass'] + + addrs = [self.router.addresses[i] for i in range(4)] + urls = dict(zip(['none', 'strict', 'unsecured', 'auth'], addrs)) + urls.update(zip(['none_s', 'strict_s', 'unsecured_s', 'auth_s'], + (Url(a, scheme="amqps") for a in addrs))) + + self.run_qdstat(['--general'] + sum([args[n] for n in arg_names], []), + address=str(urls[url_name]), + regex=r'(?s)Router Statistics.*Mode\s*Standalone') + + def ssl_test_bad(self, url_name, arg_names): + self.assertRaises(RuntimeError, self.ssl_test, url_name, arg_names) + + @unittest.skipIf(not SASL.extended(), "Cyrus library not available. skipping test") + def test_ssl_cert_to_auth_fail_no_sasl_external(self): + self.ssl_test_bad('auth_s', ['client_cert_all']) + + def test_ssl_trustfile_cert_to_auth_fail_no_sasl_external(self): + self.ssl_test_bad('auth_s', ['trustfile', 'client_cert_all']) if __name__ == '__main__': --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org