This is an automated email from the ASF dual-hosted git repository.

kgiusti pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/main by this push:
     new c2250cd  DISPATCH-2190: add SSL tests for qdstat and qdmanage
c2250cd is described below

commit c2250cd68ea21a5e21210fbfd65557284e0c80af
Author: Kenneth Giusti <kgiu...@apache.org>
AuthorDate: Fri Jul 2 16:46:47 2021 -0400

    DISPATCH-2190: add SSL tests for qdstat and qdmanage
    
    This closes #1282
---
 tests/system_tests_qdmanage.py |  69 +++-
 tests/system_tests_qdstat.py   | 778 ++++++++++++++++++++---------------------
 2 files changed, 448 insertions(+), 399 deletions(-)

diff --git a/tests/system_tests_qdmanage.py b/tests/system_tests_qdmanage.py
index 2b02ca9..bf7ba39 100644
--- a/tests/system_tests_qdmanage.py
+++ b/tests/system_tests_qdmanage.py
@@ -48,6 +48,8 @@ class QdmanageTest(TestCase):
     def setUpClass(cls):
         super(QdmanageTest, cls).setUpClass()
         cls.inter_router_port = cls.tester.get_port()
+        cls.secure_port = cls.tester.get_port()
+        cls.secure_user_port = cls.tester.get_port()
         config_1 = Qdrouterd.Config([
             ('router', {'mode': 'interior', 'id': 'R1'}),
             ('sslProfile', {'name': 'server-ssl',
@@ -62,7 +64,15 @@ class QdmanageTest(TestCase):
             ('linkRoute', {'name': 'test-link-route', 'prefix': 'xyz', 
'direction': 'in'}),
             ('autoLink', {'name': 'test-auto-link', 'address': 'mnop', 
'direction': 'out'}),
             ('listener', {'port': cls.tester.get_port(), 'sslProfile': 
'server-ssl'}),
-            ('address', {'name': 'pattern-address', 'pattern': 'a/*/b/#/c', 
'distribution': 'closest'})
+            ('address', {'name': 'pattern-address', 'pattern': 'a/*/b/#/c', 
'distribution': 'closest'}),
+
+            # for testing SSL
+            ('listener', {'host': 'localhost', 'port': cls.secure_port,
+                          'sslProfile': 'server-ssl', 'requireSsl': 'yes'}),
+            ('listener', {'host': 'localhost', 'port': cls.secure_user_port,
+                          'sslProfile': 'server-ssl', 'requireSsl': 'yes',
+                          'authenticatePeer': 'yes',
+                          'saslMechanisms': 'EXTERNAL'})
         ])
 
         config_2 = Qdrouterd.Config([
@@ -86,7 +96,7 @@ class QdmanageTest(TestCase):
         try:
             p.teardown()
         except Exception as e:
-            raise Exception(out if out else str(e))
+            raise sys.exc_info()[0](out if out else str(e))
         return out
 
     def assert_entity_equal(self, expect, actual, copy=None):
@@ -164,7 +174,7 @@ class QdmanageTest(TestCase):
             self.assertIn(t, qall_types)
 
         qlistener = json.loads(self.run_qdmanage('query --type=listener'))
-        self.assertEqual([long_type('listener')] * 2, [e['type'] for e in 
qlistener])
+        self.assertEqual([long_type('listener')] * 4, [e['type'] for e in 
qlistener])
         self.assertEqual(self.router_1.ports[0], int(qlistener[0]['port']))
 
         qattr = json.loads(self.run_qdmanage('query type name'))
@@ -612,6 +622,59 @@ class QdmanageTest(TestCase):
             # is added
             self.assertTrue(mem is None)
 
+    def test_ssl_connection(self):
+        """Verify qdmanage can securely connect via SSL"""
+        ssl_address = "amqps://localhost:%s" % self.secure_port
+        ssl_user_address = "amqps://localhost:%s" % self.secure_user_port
+        query = 'QUERY --type org.apache.qpid.dispatch.router'
+
+        # this should fail: no trustfile
+        with self.assertRaises(RuntimeError,
+                               msg="failure expected: no trustfile") as exc:
+            self.run_qdmanage(query, address=ssl_address)
+        self.assertIn("certificate verify failed", str(exc.exception),
+                      "unexpected exception: %s" % str(exc.exception))
+
+        # this should pass:
+        self.run_qdmanage(query + " --ssl-trustfile " +
+                          self.ssl_file('ca-certificate.pem'),
+                          address=ssl_address)
+
+        # this should fail: wrong hostname
+        with self.assertRaises(RuntimeError,
+                               msg="failure expected: wrong hostname") as exc:
+            self.run_qdmanage(query + " --ssl-trustfile " +
+                              self.ssl_file('ca-certificate.pem'),
+                              address="amqps://127.0.0.1:%s" % 
self.secure_port)
+        self.assertIn("certificate verify failed", str(exc.exception),
+                      "unexpected exception: %s" % str(exc.exception))
+
+        # this should pass: disable hostname check:
+        self.run_qdmanage(query + " --ssl-trustfile " +
+                          self.ssl_file('ca-certificate.pem') +
+                          " --ssl-disable-peer-name-verify",
+                          address="amqps://127.0.0.1:%s" % self.secure_port)
+
+        # this should fail: router requires client to authenticate
+        with self.assertRaises(RuntimeError,
+                               msg="client authentication should fail") as exc:
+            self.run_qdmanage(query + " --ssl-trustfile " +
+                              self.ssl_file('ca-certificate.pem'),
+                              address=ssl_user_address)
+        self.assertIn("SSL Failure", str(exc.exception),
+                      "unexpected exception: %s" % str(exc.exception))
+
+        # this should pass: qdmanage provides credentials
+        self.run_qdmanage(query + " --ssl-trustfile " +
+                          self.ssl_file('ca-certificate.pem') +
+                          " --ssl-certificate " +
+                          self.ssl_file('client-certificate.pem') +
+                          " --ssl-password-file " +
+                          self.ssl_file('client-password-file.txt') +
+                          " --ssl-key " +
+                          self.ssl_file('client-private-key.pem'),
+                          address=ssl_user_address)
+
 
 if __name__ == '__main__':
     unittest.main(main_module())
diff --git a/tests/system_tests_qdstat.py b/tests/system_tests_qdstat.py
index 9258c63..157622e 100644
--- a/tests/system_tests_qdstat.py
+++ b/tests/system_tests_qdstat.py
@@ -19,49 +19,58 @@
 
 import os
 import re
-import system_test
 import unittest
 from subprocess import PIPE
 from proton import Url, SSLDomain, SSLUnavailable, SASL
-from system_test import main_module
+from system_test import main_module, TIMEOUT, TestCase, Qdrouterd, DIR
 from proton.utils import BlockingConnection
 
 
-class QdstatTest(system_test.TestCase):
+class QdstatTestBase(TestCase):
+    """Define run_qdstat for use with all tests"""
+
+    def run_qdstat(self, args, address=None, regex=None):
+        if args is None:
+            args = []
+        args = ['qdstat',
+                '--bus', str(address or self.address()),
+                '--timeout', str(TIMEOUT)] + args
+
+        p = self.popen(args, name='qdstat-' + self.id(), stdout=PIPE,
+                       expect=None, universal_newlines=True)
+        out, err = p.communicate()
+
+        if p.returncode != 0:
+            raise RuntimeError("qdstat failed: %s (%s, %s)" % (p.returncode,
+                                                               out, err))
+        if regex is not None:
+            pattern = re.compile(regex, re.I)
+            self.assertRegex(out, pattern)
+
+        return out
+
+    def address(self):
+        """Can be overridden by tests"""
+        return self.router.addresses[0]
+
+
+class QdstatTest(QdstatTestBase):
     """Test qdstat tool output"""
     @classmethod
     def setUpClass(cls):
         super(QdstatTest, cls).setUpClass()
-        config = system_test.Qdrouterd.Config([
+        config = Qdrouterd.Config([
             ('router', {'id': 'QDR.A', 'workerThreads': 1}),
             ('listener', {'port': cls.tester.get_port()}),
         ])
         cls.router = cls.tester.qdrouterd('test-router', config)
 
-    def run_qdstat(self, args, regexp=None, address=None):
-        if args:
-            popen_args = ['qdstat', '--bus', str(address or 
self.router.addresses[0]), '--timeout', str(system_test.TIMEOUT)] + args
-        else:
-            popen_args = ['qdstat', '--bus',
-                          str(address or self.router.addresses[0]),
-                          '--timeout', str(system_test.TIMEOUT)]
-
-        p = self.popen(popen_args,
-                       name='qdstat-' + self.id(), stdout=PIPE, expect=None,
-                       universal_newlines=True)
-
-        out = p.communicate()[0]
-        assert p.returncode == 0, \
-            "qdstat exit status %s, output:\n%s" % (p.returncode, out)
-        if regexp:
-            assert re.search(regexp, out, re.I), "Can't find '%s' in '%s'" % 
(regexp, out)
-        return out
-
     def test_help(self):
-        self.run_qdstat(['--help'], r'Usage: qdstat')
+        self.run_qdstat(['--help'], regex=r'Usage: qdstat')
 
     def test_general(self):
-        out = self.run_qdstat(['--general'], r'(?s)Router 
Statistics.*Mode\s*Standalone')
+        out = self.run_qdstat(['--general'],
+                              regex=r'(?s)Router 
Statistics.*Mode\s*Standalone')
 
         self.assertTrue(re.match(r"(.*)\bConnections\b[ \t]+\b1\b(.*)",
                                  out, flags=re.DOTALL) is not None, out)
@@ -81,7 +90,9 @@ class QdstatTest(system_test.TestCase):
         self.assertEqual(out.count("QDR.A"), 2)
 
     def test_general_csv(self):
-        out = self.run_qdstat(['--general', '--csv'], r'(?s)Router 
Statistics.*Mode","Standalone')
+        out = self.run_qdstat(['--general', '--csv'],
+                              regex=r'(?s)Router 
Statistics.*Mode","Standalone')
+
         self.assertIn('"Connections","1"', out)
         self.assertIn('"Worker Threads","1"', out)
         self.assertIn('"Nodes","0"', out)
@@ -92,24 +103,24 @@ class QdstatTest(system_test.TestCase):
         self.assertEqual(out.count("QDR.A"), 2)
 
     def test_connections(self):
-        self.run_qdstat(['--connections'], r'host.*container.*role')
-        outs = self.run_qdstat(['--connections'], 'no-auth')
-        outs = self.run_qdstat(['--connections'], 'QDR.A')
+        self.run_qdstat(['--connections'], regex=r'host.*container.*role')
+        outs = self.run_qdstat(['--connections'], regex=r'no-auth')
+        outs = self.run_qdstat(['--connections'], regex=r'QDR.A')
 
     def test_connections_csv(self):
-        self.run_qdstat(['--connections', "--csv"], r'host.*container.*role')
-        outs = self.run_qdstat(['--connections'], 'no-auth')
-        outs = self.run_qdstat(['--connections'], 'QDR.A')
+        self.run_qdstat(['--connections', "--csv"], 
regex=r'host.*container.*role')
+        outs = self.run_qdstat(['--connections'], regex=r'no-auth')
+        outs = self.run_qdstat(['--connections'], regex=r'QDR.A')
 
     def test_links(self):
-        self.run_qdstat(['--links'], r'QDR.A')
-        out = self.run_qdstat(['--links'], r'endpoint.*out.*local.*temp.')
+        self.run_qdstat(['--links'], regex=r'QDR.A')
+        out = self.run_qdstat(['--links'], 
regex=r'endpoint.*out.*local.*temp.')
         parts = out.split("\n")
         self.assertEqual(len(parts), 9)
 
     def test_links_csv(self):
-        self.run_qdstat(['--links', "--csv"], r'QDR.A')
-        out = self.run_qdstat(['--links'], r'endpoint.*out.*local.*temp.')
+        self.run_qdstat(['--links', "--csv"], regex=r'QDR.A')
+        out = self.run_qdstat(['--links'], 
regex=r'endpoint.*out.*local.*temp.')
         parts = out.split("\n")
         self.assertEqual(len(parts), 9)
 
@@ -124,20 +135,20 @@ class QdstatTest(system_test.TestCase):
         self.assertEqual(len(parts), 7)
 
     def test_nodes(self):
-        self.run_qdstat(['--nodes'], r'No Router List')
+        self.run_qdstat(['--nodes'], regex=r'No Router List')
 
     def test_nodes_csv(self):
-        self.run_qdstat(['--nodes', "--csv"], r'No Router List')
+        self.run_qdstat(['--nodes', "--csv"], regex=r'No Router List')
 
     def test_address(self):
-        out = self.run_qdstat(['--address'], r'QDR.A')
-        out = self.run_qdstat(['--address'], r'\$management')
+        out = self.run_qdstat(['--address'], regex=r'QDR.A')
+        out = self.run_qdstat(['--address'], regex=r'\$management')
         parts = out.split("\n")
         self.assertEqual(len(parts), 11)
 
     def test_address_csv(self):
-        out = self.run_qdstat(['--address'], r'QDR.A')
-        out = self.run_qdstat(['--address'], r'\$management')
+        out = self.run_qdstat(['--address'], regex=r'QDR.A')
+        out = self.run_qdstat(['--address'], regex=r'\$management')
         parts = out.split("\n")
         self.assertEqual(len(parts), 11)
 
@@ -261,7 +272,7 @@ class QdstatTest(system_test.TestCase):
         self.assertIn("Total Denials", out)
 
     def test_log(self):
-        self.run_qdstat(['--log', '--limit=5'], r'AGENT \(debug\).*GET-LOG')
+        self.run_qdstat(['--log', '--limit=5'], regex=r'AGENT 
\(debug\).*GET-LOG')
 
     def test_yy_query_many_links(self):
         # This test will fail without the fix for DISPATCH-974
@@ -450,12 +461,12 @@ class QdstatTest(system_test.TestCase):
         c.close()
 
 
-class QdstatTestVhostPolicy(system_test.TestCase):
+class QdstatTestVhostPolicy(QdstatTestBase):
     """Test qdstat-with-policy tool output"""
     @classmethod
     def setUpClass(cls):
         super(QdstatTestVhostPolicy, cls).setUpClass()
-        config = system_test.Qdrouterd.Config([
+        config = Qdrouterd.Config([
             ('router', {'id': 'QDR.A', 'workerThreads': 1}),
             ('listener', {'port': cls.tester.get_port()}),
             ('policy', {'maxConnections': 100, 'enableVhostPolicy': 'true'}),
@@ -483,25 +494,6 @@ class QdstatTestVhostPolicy(system_test.TestCase):
         ])
         cls.router = cls.tester.qdrouterd('test-router', config)
 
-    def run_qdstat(self, args, regexp=None, address=None):
-        if args:
-            popen_args = ['qdstat', '--bus', str(address or 
self.router.addresses[0]), '--timeout', str(system_test.TIMEOUT)] + args
-        else:
-            popen_args = ['qdstat', '--bus',
-                          str(address or self.router.addresses[0]),
-                          '--timeout', str(system_test.TIMEOUT)]
-
-        p = self.popen(popen_args,
-                       name='qdstat-' + self.id(), stdout=PIPE, expect=None,
-                       universal_newlines=True)
-
-        out = p.communicate()[0]
-        assert p.returncode == 0, \
-            "qdstat exit status %s, output:\n%s" % (p.returncode, out)
-        if regexp:
-            assert re.search(regexp, out, re.I), "Can't find '%s' in '%s'" % 
(regexp, out)
-        return out
-
     def test_vhost(self):
         out = self.run_qdstat(['--vhosts'])
         self.assertIn("Vhosts", out)
@@ -541,19 +533,19 @@ class QdstatTestVhostPolicy(system_test.TestCase):
         self.assertIn("remote hosts", out)
 
 
-class QdstatLinkPriorityTest(system_test.TestCase):
+class QdstatLinkPriorityTest(QdstatTestBase):
     """Need 2 routers to get inter-router links for the link priority test"""
     @classmethod
     def setUpClass(cls):
         super(QdstatLinkPriorityTest, cls).setUpClass()
         cls.inter_router_port = cls.tester.get_port()
-        config_1 = system_test.Qdrouterd.Config([
+        config_1 = Qdrouterd.Config([
             ('router', {'mode': 'interior', 'id': 'R1'}),
             ('listener', {'port': cls.tester.get_port()}),
             ('connector', {'role': 'inter-router', 'port': 
cls.inter_router_port})
         ])
 
-        config_2 = system_test.Qdrouterd.Config([
+        config_2 = Qdrouterd.Config([
             ('router', {'mode': 'interior', 'id': 'R2'}),
             ('listener', {'role': 'inter-router', 'port': 
cls.inter_router_port}),
         ])
@@ -565,17 +557,6 @@ class QdstatLinkPriorityTest(system_test.TestCase):
     def address(self):
         return self.router_1.addresses[0]
 
-    def run_qdstat(self, args):
-        p = self.popen(
-            ['qdstat', '--bus', str(self.address()), '--timeout', 
str(system_test.TIMEOUT)] + args,
-            name='qdstat-' + self.id(), stdout=PIPE, expect=None,
-            universal_newlines=True)
-
-        out = p.communicate()[0]
-        assert p.returncode == 0, \
-            "qdstat exit status %s, output:\n%s" % (p.returncode, out)
-        return out
-
     def test_link_priority(self):
         out = self.run_qdstat(['--links'])
         lines = out.split("\n")
@@ -691,327 +672,332 @@ class QdstatLinkPriorityTest(system_test.TestCase):
         self._test_all_entities_all_routers(['--all-entities', '--csv', 
'--all-routers'])
 
 
-try:
-    SSLDomain(SSLDomain.MODE_CLIENT)
-
-    class QdstatSslTest(system_test.TestCase):
-        """Test qdstat tool output"""
-
-        @staticmethod
-        def ssl_file(name):
-            return os.path.join(system_test.DIR, 'ssl_certs', name)
-
-        @staticmethod
-        def sasl_path():
-            return os.path.join(system_test.DIR, 'sasl_configs')
-
-        @classmethod
-        def setUpClass(cls):
-            super(QdstatSslTest, cls).setUpClass()
-            # Write SASL configuration file:
-            with open('tests-mech-EXTERNAL.conf', 'w') as sasl_conf:
-                sasl_conf.write("mech_list: EXTERNAL ANONYMOUS DIGEST-MD5 
PLAIN\n")
-            # qdrouterd configuration:
-            config = system_test.Qdrouterd.Config([
-                ('router', {'id': 'QDR.B',
-                            'saslConfigPath': os.getcwd(),
-                            'workerThreads': 1,
-                            'saslConfigName': 'tests-mech-EXTERNAL'}),
-                ('sslProfile', {'name': 'server-ssl',
-                                'caCertFile': 
cls.ssl_file('ca-certificate.pem'),
-                                'certFile': 
cls.ssl_file('server-certificate.pem'),
-                                'privateKeyFile': 
cls.ssl_file('server-private-key.pem'),
-                                'password': 'server-password'}),
-                ('listener', {'host': 'localhost', 'port': 
cls.tester.get_port()}),
-                ('listener', {'host': 'localhost', 'port': 
cls.tester.get_port(), 'sslProfile': 'server-ssl',
-                              'authenticatePeer': 'no', 'requireSsl': 'yes'}),
-                ('listener', {'host': 'localhost', 'port': 
cls.tester.get_port(), 'sslProfile': 'server-ssl',
-                              'authenticatePeer': 'no', 'requireSsl': 'no'}),
-                ('listener', {'host': 'localhost', 'port': 
cls.tester.get_port(), 'sslProfile': 'server-ssl',
-                              'authenticatePeer': 'yes', 'requireSsl': 'yes',
-                              'saslMechanisms': 'EXTERNAL'})
-            ])
-            cls.router = cls.tester.qdrouterd('test-router', config)
-
-        def run_qdstat(self, args, regexp=None, address=None):
-            p = self.popen(
-                ['qdstat', '--bus', str(address or self.router.addresses[0]),
-                 '--timeout', str(system_test.TIMEOUT)] + args,
-                name='qdstat-' + self.id(), stdout=PIPE, expect=None,
-                universal_newlines=True)
-
-            out = p.communicate()[0]
-            assert p.returncode == 0, \
-                "qdstat exit status %s, output:\n%s" % (p.returncode, out)
-            if regexp:
-                assert re.search(regexp, out, re.I), "Can't find '%s' in '%s'" 
% (regexp, out)
-            return out
-
-        def get_ssl_args(self):
-            args = dict(
-                sasl_external=['--sasl-mechanisms', 'EXTERNAL'],
-                trustfile=['--ssl-trustfile', 
self.ssl_file('ca-certificate.pem')],
-                bad_trustfile=['--ssl-trustfile', 
self.ssl_file('bad-ca-certificate.pem')],
-                client_cert=['--ssl-certificate', 
self.ssl_file('client-certificate.pem')],
-                client_key=['--ssl-key', 
self.ssl_file('client-private-key.pem')],
-                client_pass=['--ssl-password', 'client-password'])
-            args['client_cert_all'] = args['client_cert'] + args['client_key'] 
+ args['client_pass']
-
-            return args
-
-        def ssl_test(self, url_name, arg_names):
-            """Run simple SSL connection test with supplied parameters.
-            See test_ssl_* below.
-            """
-            args = self.get_ssl_args()
-            addrs = [self.router.addresses[i] for i in range(4)]
-            urls = dict(zip(['none', 'strict', 'unsecured', 'auth'], addrs))
-            urls.update(zip(['none_s', 'strict_s', 'unsecured_s', 'auth_s'],
-                            (Url(a, scheme="amqps") for a in addrs)))
-            self.run_qdstat(['--general'] + sum([args[n] for n in arg_names], 
[]),
-                            regexp=r'(?s)Router Statistics.*Mode\s*Standalone',
-                            address=str(urls[url_name]))
-
-        def ssl_test_bad(self, url_name, arg_names):
-            self.assertRaises(AssertionError, self.ssl_test, url_name, 
arg_names)
-
-        # qdstat -b amqp://localhost:<port> --general and makes sure
-        # the router sends back a valid response.
-        def test_ssl_none(self):
-            self.ssl_test('none', [])
-
-        # qdstat -b amqps://localhost:<port> --general
-        # Make sure that the command fails.
-        def test_ssl_scheme_to_none(self):
-            self.ssl_test_bad('none_s', [])
-
-        # qdstat -b amqp://localhost:<port> --general --ssl-certificate 
/path/to/client-certificate.pem
-        # Makes sure the command fails.
-        def test_ssl_cert_to_none(self):
-            self.ssl_test_bad('none', ['client_cert'])
-
-        # Tries to run the following command on a listener that requires SSL 
(requireSsl:yes)
-        # qdstat -b amqp://localhost:<port> --general
-        # Makes sure the command fails.
-        def test_ssl_none_to_strict(self):
-            self.ssl_test_bad('strict', [])
-
-        # qdstat -b amqps://localhost:<port> --general
-        def test_ssl_schema_to_strict(self):
-            self.ssl_test_bad('strict_s', [])
-
-        # qdstat -b amqps://localhost:<port> --general --ssl-certificate 
/path/to/client-certificate.pem
-        # --ssl-key /path/to/client-private-key.pem --ssl-password 
client-password'
-        def test_ssl_cert_to_strict(self):
-            self.ssl_test_bad('strict_s', ['client_cert_all'])
-
-        # qdstat -b amqps://localhost:<port> --general --ssl-trustfile 
/path/to/ca-certificate.pem
-        def test_ssl_trustfile_to_strict(self):
-            self.ssl_test('strict_s', ['trustfile'])
-
-        # qdstat -b amqps://localhost:<port> --general --ssl-trustfile
-        # /path/to/ca-certificate.pem --ssl-certificate 
/path/to/client-certificate.pem
-        # --ssl-key /path/to/client-private-key.pem --ssl-password 
client-password
-        def test_ssl_trustfile_cert_to_strict(self):
-            self.ssl_test('strict_s', ['trustfile', 'client_cert_all'])
-
-        # qdstat -b amqps://localhost:<port> --general --ssl-trustfile 
/path/to/bad-ca-certificate.pem
-        # Send in a bad ca cert and make sure the test fails.
-        def test_ssl_bad_trustfile_to_strict(self):
-            self.ssl_test_bad('strict_s', ['bad_trustfile'])
-
-        # Require-auth SSL listener
-        # qdstat -b amqp://localhost:<port> --general
-        # Send in no certs to a 'authenticatePeer': 'yes', 'requireSsl': 'yes' 
listener and make sure it fails.
-        # Also protocol is amqp not amqps
-        def test_ssl_none_to_auth(self):
-            self.ssl_test_bad('auth', [])
-
-        # qdstat -b amqps://localhost:28491 --general
-        # Send in no certs to a 'authenticatePeer': 'yes', 'requireSsl': 'yes' 
listener and make sure it fails.
-        def test_ssl_schema_to_auth(self):
-            self.ssl_test_bad('auth_s', [])
-
-        # qdstat -b amqps://localhost:<port> --general --ssl-trustfile 
/path/to/ca-certificate.pem'
-        # Send in just a trustfile to an 'authenticatePeer': 'yes', 
'requireSsl': 'yes' listener and make sure it fails.
-        def test_ssl_trustfile_to_auth(self):
-            self.ssl_test_bad('auth_s', ['trustfile'])
-
-        # qdstat -b amqps://localhost:<port> --general --ssl-certificate 
/path/to/client-certificate.pem
-        # --ssl-key /path/to/client-private-key.pem --ssl-password 
client-password
-        # Without a trustfile, this test fails
-        def test_ssl_cert_to_auth(self):
-            self.ssl_test_bad('auth_s', ['client_cert_all'])
-
-        # qdstat -b amqps://localhost:<port> --general --ssl-trustfile 
/path/to/ca-certificate.pem
-        # --ssl-certificate /path/to/client-certificate.pem
-        # --ssl-key /path/to/client-private-key.pem --ssl-password 
client-password
-        # This has everything, the test should pass.
-        def test_ssl_trustfile_cert_to_auth(self):
-            self.ssl_test('auth_s', ['trustfile', 'client_cert_all'])
-
-        # qdstat -b amqps://localhost:<port> --general --ssl-trustfile 
/path/to/bad-ca-certificate.pem
-        # --ssl-certificate /path/to/client-certificate.pem --ssl-key 
/path/to/client-private-key.pem
-        # --ssl-password client-password
-        # Bad trustfile should be rejected.
-        def test_ssl_bad_trustfile_to_auth(self):
-            self.ssl_test_bad('auth_s', ['bad_trustfile', 'client_cert_all'])
-
-        # qdstat -b amqps://localhost:<port> --general --sasl-mechanisms 
EXTERNAL
-        # --ssl-certificate /path/to/client-certificate.pem --ssl-key 
/path/to/client-private-key.pem
-        # --ssl-password client-password --ssl-trustfile 
/path/to/ca-certificate.pem'
-        def test_ssl_cert_explicit_external_to_auth(self):
-            self.ssl_test('auth_s', ['sasl_external', 'client_cert_all', 
'trustfile'])
-
-        # Unsecured SSL listener, allows non-SSL
-        # qdstat -b amqp://localhost:<port> --general
-        def test_ssl_none_to_unsecured(self):
-            self.ssl_test('unsecured', [])
-
-        # qdstat -b amqps://localhost:<port> --general
-        def test_ssl_schema_to_unsecured(self):
-            self.ssl_test_bad('unsecured_s', [])
-
-        # qdstat -b amqps://localhost:<port> --general --ssl-certificate 
/path/to/client-certificate.pem --ssl-key
-        # /path/to/client-private-key.pem --ssl-password client-password
-        # A trustfile is required, test will fail
-        def test_ssl_cert_to_unsecured(self):
-            self.ssl_test_bad('unsecured_s', ['client_cert_all'])
-
-        # qdstat -b amqps://localhost:<port> --general --ssl-trustfile 
/path/to/ca-certificate.pem'
-        # Just send in the trustfile, should be all good.
-        def test_ssl_trustfile_to_unsecured(self):
-            self.ssl_test('unsecured_s', ['trustfile'])
-
-        # qdstat -b amqps://localhost:<port> --general --ssl-trustfile 
/path/to/ca-certificate.pem
-        # --ssl-certificate /path/to/client-certificate.pem --ssl-key 
/path/to/client-private-key.pem
-        # --ssl-password client-password
-        # We have everything, this should work.
-        def test_ssl_trustfile_cert_to_unsecured(self):
-            self.ssl_test('unsecured_s', ['trustfile', 'client_cert_all'])
-
-        # qdstat -b amqps://localhost:<port> --general --ssl-trustfile 
/path/to/bad-ca-certificate.pem']
-        # Bad trustfile, test will fail.
-        def test_ssl_bad_trustfile_to_unsecured(self):
-            self.ssl_test_bad('unsecured_s', ['bad_trustfile'])
-
-except SSLUnavailable:
-    class QdstatSslTest(system_test.TestCase):
-        def test_skip(self):
-            self.skipTest("Proton SSL support unavailable.")
-
-try:
-    SSLDomain(SSLDomain.MODE_CLIENT)
-
-    class QdstatSslTestSslPasswordFile(QdstatSslTest):
+def _has_ssl():
+    try:
+        SSLDomain(SSLDomain.MODE_CLIENT)
+        return True
+    except SSLUnavailable:
+        return False
+
+
+@unittest.skipIf(_has_ssl() is False, "Proton SSL support unavailable")
+class QdstatSslTest(QdstatTestBase):
+    """Test qdstat tool output"""
+
+    @staticmethod
+    def ssl_file(name):
+        return os.path.join(DIR, 'ssl_certs', name)
+
+    @staticmethod
+    def sasl_path():
+        return os.path.join(DIR, 'sasl_configs')
+
+    @classmethod
+    def setUpClass(cls):
+        super(QdstatSslTest, cls).setUpClass()
+        # Write SASL configuration file:
+        with open('tests-mech-EXTERNAL.conf', 'w') as sasl_conf:
+            sasl_conf.write("mech_list: EXTERNAL ANONYMOUS DIGEST-MD5 PLAIN\n")
+        # qdrouterd configuration.  Note well: modifying the order of
+        # listeners will require updating ssl_test()
+        cls.strict_port = cls.tester.get_port()
+        config = Qdrouterd.Config([
+            ('router', {'id': 'QDR.B',
+                        'saslConfigPath': os.getcwd(),
+                        'workerThreads': 1,
+                        'saslConfigName': 'tests-mech-EXTERNAL'}),
+            ('sslProfile', {'name': 'server-ssl',
+                            'caCertFile': cls.ssl_file('ca-certificate.pem'),
+                            'certFile': cls.ssl_file('server-certificate.pem'),
+                            'privateKeyFile': 
cls.ssl_file('server-private-key.pem'),
+                            'password': 'server-password'}),
+
+            # 'none', 'none_s':
+            ('listener', {'host': 'localhost', 'port': cls.tester.get_port()}),
+
+            # 'strict', 'strict_s':
+            ('listener', {'host': 'localhost', 'port': cls.strict_port, 
'sslProfile': 'server-ssl',
+                          'authenticatePeer': 'no', 'requireSsl': 'yes'}),
+
+            # 'unsecured', 'unsecured_s':
+            ('listener', {'host': 'localhost', 'port': cls.tester.get_port(), 
'sslProfile': 'server-ssl',
+                          'authenticatePeer': 'no', 'requireSsl': 'no'}),
+
+            # 'auth', 'auth_s':
+            ('listener', {'host': 'localhost', 'port': cls.tester.get_port(), 
'sslProfile': 'server-ssl',
+                          'authenticatePeer': 'yes', 'requireSsl': 'yes',
+                          'saslMechanisms': 'EXTERNAL'})
+        ])
+        cls.router = cls.tester.qdrouterd('test-router', config)
+
+    def get_ssl_args(self):
+        """A map of short names to the corresponding qdstat arguments.  A list
+        of keys are passed to ssl_test() via the arg_names parameter list.
         """
-        Tests the --ssl-password-file command line parameter
+        args = dict(
+            sasl_external=['--sasl-mechanisms', 'EXTERNAL'],
+            trustfile=['--ssl-trustfile', self.ssl_file('ca-certificate.pem')],
+            bad_trustfile=['--ssl-trustfile', 
self.ssl_file('bad-ca-certificate.pem')],
+            client_cert=['--ssl-certificate', 
self.ssl_file('client-certificate.pem')],
+            client_key=['--ssl-key', self.ssl_file('client-private-key.pem')],
+            client_pass=['--ssl-password', 'client-password'])
+        args['client_cert_all'] = args['client_cert'] + args['client_key'] + 
args['client_pass']
+
+        return args
+
+    def ssl_test(self, url_name, arg_names):
+        """Run simple SSL connection test with supplied parameters.
+
+        :param url_name: a shorthand name used to select which router
+        interface qdstat will connect to:
+        'none' = No SSL or SASL config
+        'strict' = Require SSL, No SASL config
+        'unsecured' = SSL not required, SASL not required
+        'auth' = Require SASL (external) and SSL
+
+        The '*_s' names simply replace 'amqp:' with 'amqps:' in the
+        generated URL.
+
+        See test_ssl_* below.
         """
+        args = self.get_ssl_args()
+        addrs = [self.router.addresses[i] for i in range(4)]
+        urls = dict(zip(['none', 'strict', 'unsecured', 'auth'], addrs))
+        urls.update(zip(['none_s', 'strict_s', 'unsecured_s', 'auth_s'],
+                        (Url(a, scheme="amqps") for a in addrs)))
+        self.run_qdstat(['--general'] + sum([args[n] for n in arg_names], []),
+                        address=str(urls[url_name]),
+                        regex=r'(?s)Router Statistics.*Mode\s*Standalone')
+
+    def ssl_test_bad(self, url_name, arg_names):
+        self.assertRaises(RuntimeError, self.ssl_test, url_name, arg_names)
+
+    # qdstat -b amqp://localhost:<port> --general and makes sure
+    # the router sends back a valid response.
+    def test_ssl_none(self):
+        self.ssl_test('none', [])
+
+    # qdstat -b amqps://localhost:<port> --general
+    # Make sure that the command fails.
+    def test_ssl_scheme_to_none(self):
+        self.ssl_test_bad('none_s', [])
+
+    # qdstat -b amqp://localhost:<port> --general --ssl-certificate 
/path/to/client-certificate.pem
+    # Makes sure the command fails.
+    def test_ssl_cert_to_none(self):
+        self.ssl_test_bad('none', ['client_cert'])
+
+    # Tries to run the following command on a listener that requires SSL 
(requireSsl:yes)
+    # qdstat -b amqp://localhost:<port> --general
+    # Makes sure the command fails.
+    def test_ssl_none_to_strict(self):
+        self.ssl_test_bad('strict', [])
+
+    # qdstat -b amqps://localhost:<port> --general
+    def test_ssl_schema_to_strict(self):
+        self.ssl_test_bad('strict_s', [])
+
+    # qdstat -b amqps://localhost:<port> --general --ssl-certificate 
/path/to/client-certificate.pem
+    # --ssl-key /path/to/client-private-key.pem --ssl-password client-password'
+    def test_ssl_cert_to_strict(self):
+        self.ssl_test_bad('strict_s', ['client_cert_all'])
+
+    # qdstat -b amqps://localhost:<port> --general --ssl-trustfile 
/path/to/ca-certificate.pem
+    def test_ssl_trustfile_to_strict(self):
+        self.ssl_test('strict_s', ['trustfile'])
+
+    # qdstat -b amqps://localhost:<port> --general --ssl-trustfile
+    # /path/to/ca-certificate.pem --ssl-certificate 
/path/to/client-certificate.pem
+    # --ssl-key /path/to/client-private-key.pem --ssl-password client-password
+    def test_ssl_trustfile_cert_to_strict(self):
+        self.ssl_test('strict_s', ['trustfile', 'client_cert_all'])
+
+    # qdstat -b amqps://localhost:<port> --general --ssl-trustfile 
/path/to/bad-ca-certificate.pem
+    # Send in a bad ca cert and make sure the test fails.
+    def test_ssl_bad_trustfile_to_strict(self):
+        self.ssl_test_bad('strict_s', ['bad_trustfile'])
+
+    # Require-auth SSL listener
+    # qdstat -b amqp://localhost:<port> --general
+    # Send in no certs to a 'authenticatePeer': 'yes', 'requireSsl': 'yes' 
listener and make sure it fails.
+    # Also protocol is amqp not amqps
+    def test_ssl_none_to_auth(self):
+        self.ssl_test_bad('auth', [])
+
+    # qdstat -b amqps://localhost:28491 --general
+    # Send in no certs to a 'authenticatePeer': 'yes', 'requireSsl': 'yes' 
listener and make sure it fails.
+    def test_ssl_schema_to_auth(self):
+        self.ssl_test_bad('auth_s', [])
+
+    # qdstat -b amqps://localhost:<port> --general --ssl-trustfile 
/path/to/ca-certificate.pem'
+    # Send in just a trustfile to an 'authenticatePeer': 'yes', 'requireSsl': 
'yes' listener and make sure it fails.
+    def test_ssl_trustfile_to_auth(self):
+        self.ssl_test_bad('auth_s', ['trustfile'])
+
+    # qdstat -b amqps://localhost:<port> --general --ssl-certificate 
/path/to/client-certificate.pem
+    # --ssl-key /path/to/client-private-key.pem --ssl-password client-password
+    # Without a trustfile, this test fails
+    def test_ssl_cert_to_auth(self):
+        self.ssl_test_bad('auth_s', ['client_cert_all'])
+
+    # qdstat -b amqps://localhost:<port> --general --ssl-trustfile 
/path/to/ca-certificate.pem
+    # --ssl-certificate /path/to/client-certificate.pem
+    # --ssl-key /path/to/client-private-key.pem --ssl-password client-password
+    # This has everything, the test should pass.
+    def test_ssl_trustfile_cert_to_auth(self):
+        self.ssl_test('auth_s', ['trustfile', 'client_cert_all'])
+
+    # qdstat -b amqps://localhost:<port> --general --ssl-trustfile 
/path/to/bad-ca-certificate.pem
+    # --ssl-certificate /path/to/client-certificate.pem --ssl-key 
/path/to/client-private-key.pem
+    # --ssl-password client-password
+    # Bad trustfile should be rejected.
+    def test_ssl_bad_trustfile_to_auth(self):
+        self.ssl_test_bad('auth_s', ['bad_trustfile', 'client_cert_all'])
+
+    # qdstat -b amqps://localhost:<port> --general --sasl-mechanisms EXTERNAL
+    # --ssl-certificate /path/to/client-certificate.pem --ssl-key 
/path/to/client-private-key.pem
+    # --ssl-password client-password --ssl-trustfile 
/path/to/ca-certificate.pem'
+    def test_ssl_cert_explicit_external_to_auth(self):
+        self.ssl_test('auth_s', ['sasl_external', 'client_cert_all', 
'trustfile'])
+
+    # Unsecured SSL listener, allows non-SSL
+    # qdstat -b amqp://localhost:<port> --general
+    def test_ssl_none_to_unsecured(self):
+        self.ssl_test('unsecured', [])
+
+    # qdstat -b amqps://localhost:<port> --general
+    def test_ssl_schema_to_unsecured(self):
+        self.ssl_test_bad('unsecured_s', [])
+
+    # qdstat -b amqps://localhost:<port> --general --ssl-certificate 
/path/to/client-certificate.pem --ssl-key
+    # /path/to/client-private-key.pem --ssl-password client-password
+    # A trustfile is required, test will fail
+    def test_ssl_cert_to_unsecured(self):
+        self.ssl_test_bad('unsecured_s', ['client_cert_all'])
+
+    # qdstat -b amqps://localhost:<port> --general --ssl-trustfile 
/path/to/ca-certificate.pem'
+    # Just send in the trustfile, should be all good.
+    def test_ssl_trustfile_to_unsecured(self):
+        self.ssl_test('unsecured_s', ['trustfile'])
+
+    # qdstat -b amqps://localhost:<port> --general --ssl-trustfile 
/path/to/ca-certificate.pem
+    # --ssl-certificate /path/to/client-certificate.pem --ssl-key 
/path/to/client-private-key.pem
+    # --ssl-password client-password
+    # We have everything, this should work.
+    def test_ssl_trustfile_cert_to_unsecured(self):
+        self.ssl_test('unsecured_s', ['trustfile', 'client_cert_all'])
+
+    # qdstat -b amqps://localhost:<port> --general --ssl-trustfile 
/path/to/bad-ca-certificate.pem']
+    # Bad trustfile, test will fail.
+    def test_ssl_bad_trustfile_to_unsecured(self):
+        self.ssl_test_bad('unsecured_s', ['bad_trustfile'])
+
+    def test_ssl_peer_name_verify_disabled(self):
+        """Verify the --ssl-disable-peer-name-verify option"""
+        params = [x for x in self.get_ssl_args()['trustfile']]
+
+        # Expect the connection to fail, since the certificate has
+        # 'localhost' as the peer name and we used '127.0.0.1' instead.
+
+        with self.assertRaises(RuntimeError,
+                               msg="expected fail: host name wrong") as exc:
+            self.run_qdstat(address="amqps://127.0.0.1:%s" % self.strict_port,
+                            args=['--general'] + params)
+
+        # repeat the same operation but using
+        # --ssl-disable--peer-name-verify.  This should succeed:
+
+        self.run_qdstat(address="amqps://127.0.0.1:%s" % self.strict_port,
+                        args=['--general',
+                              '--ssl-disable-peer-name-verify'] + params)
+
+
+@unittest.skipIf(_has_ssl() is False, "Proton SSL support unavailable")
+class QdstatSslTestSslPasswordFile(QdstatSslTest):
+    """
+    Tests the --ssl-password-file command line parameter
+    """
+
+    def get_ssl_args(self):
+        args = dict(
+            sasl_external=['--sasl-mechanisms', 'EXTERNAL'],
+            trustfile=['--ssl-trustfile', self.ssl_file('ca-certificate.pem')],
+            bad_trustfile=['--ssl-trustfile', 
self.ssl_file('bad-ca-certificate.pem')],
+            client_cert=['--ssl-certificate', 
self.ssl_file('client-certificate.pem')],
+            client_key=['--ssl-key', self.ssl_file('client-private-key.pem')],
+            client_pass=['--ssl-password-file', 
self.ssl_file('client-password-file.txt')])
+        args['client_cert_all'] = args['client_cert'] + args['client_key'] + 
args['client_pass']
+
+        return args
+
+
+@unittest.skipIf(_has_ssl() is False, "Proton SSL support unavailable")
+class QdstatSslNoExternalTest(QdstatTestBase):
+    """Test qdstat can't connect without sasl_mech EXTERNAL"""
+
+    @staticmethod
+    def ssl_file(name):
+        return os.path.join(DIR, 'ssl_certs', name)
+
+    @staticmethod
+    def sasl_path():
+        return os.path.join(DIR, 'sasl_configs')
+
+    @classmethod
+    def setUpClass(cls):
+        super(QdstatSslNoExternalTest, cls).setUpClass()
+        # Write SASL configuration file:
+        with open('tests-mech-NOEXTERNAL.conf', 'w') as sasl_conf:
+            sasl_conf.write("mech_list: ANONYMOUS DIGEST-MD5 PLAIN\n")
+        # qdrouterd configuration:
+        config = Qdrouterd.Config([
+            ('router', {'id': 'QDR.C',
+                        'saslConfigPath': os.getcwd(),
+                        'workerThreads': 1,
+                        'saslConfigName': 'tests-mech-NOEXTERNAL'}),
+            ('sslProfile', {'name': 'server-ssl',
+                            'caCertFile': cls.ssl_file('ca-certificate.pem'),
+                            'certFile': cls.ssl_file('server-certificate.pem'),
+                            'privateKeyFile': 
cls.ssl_file('server-private-key.pem'),
+                            'password': 'server-password'}),
+            ('listener', {'port': cls.tester.get_port()}),
+            ('listener', {'port': cls.tester.get_port(), 'sslProfile': 
'server-ssl', 'authenticatePeer': 'no', 'requireSsl': 'yes'}),
+            ('listener', {'port': cls.tester.get_port(), 'sslProfile': 
'server-ssl', 'authenticatePeer': 'no', 'requireSsl': 'no'}),
+            ('listener', {'port': cls.tester.get_port(), 'sslProfile': 
'server-ssl', 'authenticatePeer': 'yes', 'requireSsl': 'yes',
+                          'saslMechanisms': 'EXTERNAL'})
+        ])
+        cls.router = cls.tester.qdrouterd('test-router', config)
 
-        def get_ssl_args(self):
-            args = dict(
-                sasl_external=['--sasl-mechanisms', 'EXTERNAL'],
-                trustfile=['--ssl-trustfile', 
self.ssl_file('ca-certificate.pem')],
-                bad_trustfile=['--ssl-trustfile', 
self.ssl_file('bad-ca-certificate.pem')],
-                client_cert=['--ssl-certificate', 
self.ssl_file('client-certificate.pem')],
-                client_key=['--ssl-key', 
self.ssl_file('client-private-key.pem')],
-                client_pass=['--ssl-password-file', 
self.ssl_file('client-password-file.txt')])
-            args['client_cert_all'] = args['client_cert'] + args['client_key'] 
+ args['client_pass']
-
-            return args
-
-except SSLUnavailable:
-    class QdstatSslTest(system_test.TestCase):
-        def test_skip(self):
-            self.skipTest("Proton SSL support unavailable.")
-
-try:
-    SSLDomain(SSLDomain.MODE_CLIENT)
-
-    class QdstatSslNoExternalTest(system_test.TestCase):
-        """Test qdstat can't connect without sasl_mech EXTERNAL"""
-
-        @staticmethod
-        def ssl_file(name):
-            return os.path.join(system_test.DIR, 'ssl_certs', name)
-
-        @staticmethod
-        def sasl_path():
-            return os.path.join(system_test.DIR, 'sasl_configs')
-
-        @classmethod
-        def setUpClass(cls):
-            super(QdstatSslNoExternalTest, cls).setUpClass()
-            # Write SASL configuration file:
-            with open('tests-mech-NOEXTERNAL.conf', 'w') as sasl_conf:
-                sasl_conf.write("mech_list: ANONYMOUS DIGEST-MD5 PLAIN\n")
-            # qdrouterd configuration:
-            config = system_test.Qdrouterd.Config([
-                ('router', {'id': 'QDR.C',
-                            'saslConfigPath': os.getcwd(),
-                            'workerThreads': 1,
-                            'saslConfigName': 'tests-mech-NOEXTERNAL'}),
-                ('sslProfile', {'name': 'server-ssl',
-                                'caCertFile': 
cls.ssl_file('ca-certificate.pem'),
-                                'certFile': 
cls.ssl_file('server-certificate.pem'),
-                                'privateKeyFile': 
cls.ssl_file('server-private-key.pem'),
-                                'password': 'server-password'}),
-                ('listener', {'port': cls.tester.get_port()}),
-                ('listener', {'port': cls.tester.get_port(), 'sslProfile': 
'server-ssl', 'authenticatePeer': 'no', 'requireSsl': 'yes'}),
-                ('listener', {'port': cls.tester.get_port(), 'sslProfile': 
'server-ssl', 'authenticatePeer': 'no', 'requireSsl': 'no'}),
-                ('listener', {'port': cls.tester.get_port(), 'sslProfile': 
'server-ssl', 'authenticatePeer': 'yes', 'requireSsl': 'yes',
-                              'saslMechanisms': 'EXTERNAL'})
-            ])
-            cls.router = cls.tester.qdrouterd('test-router', config)
-
-        def run_qdstat(self, args, regexp=None, address=None):
-            p = self.popen(
-                ['qdstat', '--bus', str(address or self.router.addresses[0]), 
'--timeout', str(system_test.TIMEOUT)] + args,
-                name='qdstat-' + self.id(), stdout=PIPE, expect=None,
-                universal_newlines=True)
-            out = p.communicate()[0]
-            assert p.returncode == 0, \
-                "qdstat exit status %s, output:\n%s" % (p.returncode, out)
-            if regexp:
-                assert re.search(regexp, out, re.I), "Can't find '%s' in '%s'" 
% (regexp, out)
-            return out
-
-        def ssl_test(self, url_name, arg_names):
-            """Run simple SSL connection test with supplied parameters.
-            See test_ssl_* below.
-            """
-            args = dict(
-                trustfile=['--ssl-trustfile', 
self.ssl_file('ca-certificate.pem')],
-                bad_trustfile=['--ssl-trustfile', 
self.ssl_file('bad-ca-certificate.pem')],
-                client_cert=['--ssl-certificate', 
self.ssl_file('client-certificate.pem')],
-                client_key=['--ssl-key', 
self.ssl_file('client-private-key.pem')],
-                client_pass=['--ssl-password', 'client-password'])
-            args['client_cert_all'] = args['client_cert'] + args['client_key'] 
+ args['client_pass']
-
-            addrs = [self.router.addresses[i] for i in range(4)]
-            urls = dict(zip(['none', 'strict', 'unsecured', 'auth'], addrs))
-            urls.update(zip(['none_s', 'strict_s', 'unsecured_s', 'auth_s'],
-                            (Url(a, scheme="amqps") for a in addrs)))
-
-            self.run_qdstat(['--general'] + sum([args[n] for n in arg_names], 
[]),
-                            regexp=r'(?s)Router Statistics.*Mode\s*Standalone',
-                            address=str(urls[url_name]))
-
-        def ssl_test_bad(self, url_name, arg_names):
-            self.assertRaises(AssertionError, self.ssl_test, url_name, 
arg_names)
-
-        @unittest.skipIf(not SASL.extended(), "Cyrus library not available. 
skipping test")
-        def test_ssl_cert_to_auth_fail_no_sasl_external(self):
-            self.ssl_test_bad('auth_s', ['client_cert_all'])
-
-        def test_ssl_trustfile_cert_to_auth_fail_no_sasl_external(self):
-            self.ssl_test_bad('auth_s', ['trustfile', 'client_cert_all'])
-
-
-except SSLUnavailable:
-    class QdstatSslTest(system_test.TestCase):
-        def test_skip(self):
-            self.skipTest("Proton SSL support unavailable.")
+    def ssl_test(self, url_name, arg_names):
+        """Run simple SSL connection test with supplied parameters.
+        See test_ssl_* below.
+        """
+        args = dict(
+            trustfile=['--ssl-trustfile', self.ssl_file('ca-certificate.pem')],
+            bad_trustfile=['--ssl-trustfile', 
self.ssl_file('bad-ca-certificate.pem')],
+            client_cert=['--ssl-certificate', 
self.ssl_file('client-certificate.pem')],
+            client_key=['--ssl-key', self.ssl_file('client-private-key.pem')],
+            client_pass=['--ssl-password', 'client-password'])
+        args['client_cert_all'] = args['client_cert'] + args['client_key'] + 
args['client_pass']
+
+        addrs = [self.router.addresses[i] for i in range(4)]
+        urls = dict(zip(['none', 'strict', 'unsecured', 'auth'], addrs))
+        urls.update(zip(['none_s', 'strict_s', 'unsecured_s', 'auth_s'],
+                        (Url(a, scheme="amqps") for a in addrs)))
+
+        self.run_qdstat(['--general'] + sum([args[n] for n in arg_names], []),
+                        address=str(urls[url_name]),
+                        regex=r'(?s)Router Statistics.*Mode\s*Standalone')
+
+    def ssl_test_bad(self, url_name, arg_names):
+        self.assertRaises(RuntimeError, self.ssl_test, url_name, arg_names)
+
+    @unittest.skipIf(not SASL.extended(), "Cyrus library not available. 
skipping test")
+    def test_ssl_cert_to_auth_fail_no_sasl_external(self):
+        self.ssl_test_bad('auth_s', ['client_cert_all'])
+
+    def test_ssl_trustfile_cert_to_auth_fail_no_sasl_external(self):
+        self.ssl_test_bad('auth_s', ['trustfile', 'client_cert_all'])
 
 
 if __name__ == '__main__':

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to