Francis Ginther has proposed merging ~fginther/cloud-init:feature/ssh_disable_users into cloud-init:master.
Commit message: Add a configuration option, 'ssh_disable_users', for declaring a list of usernames to disable login via ssh and redirect to the default user. Also adds unit tests for config/cc_ssh.py to verify both the pre-existing and the new behavior. Requested reviews: cloud-init commiters (cloud-init-dev) Related bugs: Bug #1771198 in cloud-init: "Support disable_root-esque behaviour for other users" https://bugs.launchpad.net/cloud-init/+bug/1771198 For more details, see: https://code.launchpad.net/~fginther/cloud-init/+git/cloud-init/+merge/352053 This re-implements the 'disable_root' option for a list of users, instead of just root. Testing is provided through unit tests. These did not exist for the config/cc_ssh.py module, so a basic set of tests were created to cover the existing 'disable_root' behavior. These tests were then expanded an modified to match the 'ssh_disable_users' implementation. The 'disable_root: true' option still exists, but it will be converted to 'ssh_disable_users: ["root"]' for processing. -- Your team cloud-init commiters is requested to review the proposed merge of ~fginther/cloud-init:feature/ssh_disable_users into cloud-init:master.
diff --git a/cloudinit/config/cc_ssh.py b/cloudinit/config/cc_ssh.py old mode 100755 new mode 100644 index 45204a0..52c819b --- a/cloudinit/config/cc_ssh.py +++ b/cloudinit/config/cc_ssh.py @@ -55,6 +55,11 @@ root login is disabled, and root login opts are set to:: no-port-forwarding,no-agent-forwarding,no-X11-forwarding +Login for other users can similarly be disabled with the ``ssh_disable_users`` +config list. Users in this list will have the same ``disable_root_opts`` +applied and references to the string ``$ROOT`` will be replace with the user +being redirected. + Authorized keys for the default user/first user defined in ``users`` can be specified using `ssh_authorized_keys``. Keys should be specified as a list of public keys. @@ -87,6 +92,7 @@ public keys. dsa_public: ssh-dsa AAAAB3NzaC1yc2EAAAABIwAAAGEAoPRhIfLvedSDKw7Xd ... ssh_genkeytypes: <key type> disable_root: <true/false> + ssh_disable_users: <list of user to block and redirect to the default user> disable_root_opts: <disable root options string> ssh_authorized_keys: - ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAGEA3FSyQwBI6Z+nCSjUU ... @@ -104,7 +110,7 @@ from cloudinit import util DISABLE_ROOT_OPTS = ( "no-port-forwarding,no-agent-forwarding," "no-X11-forwarding,command=\"echo \'Please login as the user \\\"$USER\\\"" - " rather than the user \\\"root\\\".\';echo;sleep 10\"") + " rather than the user \\\"$ROOT\\\".\';echo;sleep 10\"") GENERATE_KEY_NAMES = ['rsa', 'dsa', 'ecdsa', 'ed25519'] KEY_FILE_TPL = '/etc/ssh/ssh_host_%s_key' @@ -183,33 +189,45 @@ def handle(_name, cfg, cloud, log, _args): try: (users, _groups) = ug_util.normalize_users_groups(cfg, cloud.distro) (user, _user_config) = ug_util.extract_default(users) + ssh_disable_users = util.get_cfg_option_list(cfg, "ssh_disable_users", + []) disable_root = util.get_cfg_option_bool(cfg, "disable_root", True) disable_root_opts = util.get_cfg_option_str(cfg, "disable_root_opts", DISABLE_ROOT_OPTS) + ssh_disable_users = list(set(ssh_disable_users).difference(set(users))) + for ssh_user in ssh_disable_users: + cloud.distro.create_user(ssh_user, **cfg) + + if disable_root: + ssh_disable_users.append("root") + keys = cloud.get_public_ssh_keys() or [] if "ssh_authorized_keys" in cfg: cfgkeys = cfg["ssh_authorized_keys"] keys.extend(cfgkeys) - apply_credentials(keys, user, disable_root, disable_root_opts) + apply_credentials(keys, user, ssh_disable_users, disable_root_opts) except Exception: util.logexc(log, "Applying ssh credentials failed!") -def apply_credentials(keys, user, disable_root, disable_root_opts): +def apply_credentials(keys, user, ssh_disable_users, disable_root_opts): keys = set(keys) + ssh_disable_users = set(ssh_disable_users) if user: ssh_util.setup_user_keys(keys, user) - if disable_root: - if not user: - user = "NONE" - key_prefix = disable_root_opts.replace('$USER', user) - else: - key_prefix = '' + if 'root' not in ssh_disable_users: + ssh_util.setup_user_keys(keys, 'root', options='') + + if not user: + user = "NONE" + key_prefix = disable_root_opts.replace('$USER', user) - ssh_util.setup_user_keys(keys, 'root', options=key_prefix) + for disable_user in ssh_disable_users: + disable_prefix = key_prefix.replace('$ROOT', disable_user) + ssh_util.setup_user_keys(keys, disable_user, options=disable_prefix) # vi: ts=4 expandtab diff --git a/cloudinit/config/tests/test_ssh.py b/cloudinit/config/tests/test_ssh.py new file mode 100644 index 0000000..3da4330 --- /dev/null +++ b/cloudinit/config/tests/test_ssh.py @@ -0,0 +1,277 @@ +# This file is part of cloud-init. See LICENSE file for license information. + +import mock + +from cloudinit.config import cc_ssh +from cloudinit.tests.helpers import CiTestCase + +MODPATH = "cloudinit.config.cc_ssh." + + +class TestHandleSsh(CiTestCase): + """Test cc_ssh handling of ssh config.""" + + with_logs = True + + def _assert_has_calls(self, mock, expected_list): + """Compare the expected call list with the mock's in any order.""" + mock_list = mock.call_args_list[:] + for expected_call in expected_list: + if expected_call in mock_list: + mock_list.pop(mock_list.index(expected_call)) + else: + self.assertIn(expected_call, mock_list) + + @mock.patch(MODPATH + "ssh_util.setup_user_keys") + def test_apply_credentials_with_user(self, m_suk): + """Apply keys for the given user and root.""" + keys = ["key1"] + user = "clouduser" + options = cc_ssh.DISABLE_ROOT_OPTS + cc_ssh.apply_credentials(keys, user, [], options) + self.assertEqual([mock.call(set(keys), user), + mock.call(set(keys), "root", options="")], + m_suk.call_args_list) + + @mock.patch(MODPATH + "ssh_util.setup_user_keys") + def test_apply_credentials_with_no_user(self, m_suk): + """Apply keys for root only.""" + keys = ["key1"] + user = None + options = cc_ssh.DISABLE_ROOT_OPTS + cc_ssh.apply_credentials(keys, user, [], options) + self.assertEqual([mock.call(set(keys), "root", options="")], + m_suk.call_args_list) + + @mock.patch(MODPATH + "ssh_util.setup_user_keys") + def test_apply_credentials_with_user_disable_root(self, m_suk): + """Apply keys for the given user and disable root ssh.""" + keys = ["key1"] + user = "clouduser" + options = cc_ssh.DISABLE_ROOT_OPTS + cc_ssh.apply_credentials(keys, user, ["root"], options) + options = options.replace("$USER", user) + options = options.replace("$ROOT", "root") + self.assertEqual([mock.call(set(keys), user), + mock.call(set(keys), "root", options=options)], + m_suk.call_args_list) + + @mock.patch(MODPATH + "ssh_util.setup_user_keys") + def test_apply_credentials_with_no_user_disable_root(self, m_suk): + """Apply keys no user and disable root ssh.""" + keys = ["key1"] + user = None + options = cc_ssh.DISABLE_ROOT_OPTS + cc_ssh.apply_credentials(keys, user, ["root"], options) + options = options.replace("$USER", "NONE") + options = options.replace("$ROOT", "root") + self.assertEqual([mock.call(set(keys), "root", options=options)], + m_suk.call_args_list) + + @mock.patch(MODPATH + "ssh_util.setup_user_keys") + def test_apply_credentials_with_user_and_disable_list(self, m_suk): + """Apply keys for the given user and disable a list of users.""" + keys = ["key1"] + user = "clouduser" + options = cc_ssh.DISABLE_ROOT_OPTS + cc_ssh.apply_credentials(keys, user, ["root", "disable_me"], options) + options = options.replace("$USER", user) + options_root = options.replace("$ROOT", "root") + options_disable_me = options.replace("$ROOT", "disable_me") + self._assert_has_calls( + m_suk, + [mock.call(set(keys), user), + mock.call(set(keys), "root", options=options_root), + mock.call(set(keys), "disable_me", options=options_disable_me)]) + + @mock.patch(MODPATH + "ssh_util.setup_user_keys") + def test_apply_credentials_with_user_and_disable_non_root(self, m_suk): + """Apply keys for the given user and disable non-root users.""" + keys = ["key1"] + user = "clouduser" + options = cc_ssh.DISABLE_ROOT_OPTS + cc_ssh.apply_credentials(keys, user, ["disable_me", "disable_you"], + options) + options = options.replace("$USER", user) + options_disable_me = options.replace("$ROOT", "disable_me") + options_disable_you = options.replace("$ROOT", "disable_you") + print(dir(mock)) + self._assert_has_calls( + m_suk, + [mock.call(set(keys), user), + mock.call(set(keys), "root", options=""), + mock.call(set(keys), "disable_me", options=options_disable_me), + mock.call(set(keys), "disable_you", options=options_disable_you)]) + + @mock.patch(MODPATH + "ssh_util.setup_user_keys") + @mock.patch(MODPATH + "ug_util.normalize_users_groups") + @mock.patch(MODPATH + "os.path.exists") + def test_handle_no_cfg(self, m_ope, m_nug, m_suk): + """Test handle with no config and no distro user.""" + cfg = {} + keys = ["key1"] + # Mock os.path.exits to True to short-circuit the key writing logic + m_ope.return_value = True + m_nug.return_value = ([], {}) + cloud = mock.Mock() + cloud.distro = mock.Mock() + cloud.get_public_ssh_keys = mock.Mock(return_value=keys) + cc_ssh.handle("name", cfg, cloud, self.logger, None) + + options = cc_ssh.DISABLE_ROOT_OPTS + options = options.replace("$USER", "NONE") + options = options.replace("$ROOT", "root") + self.assertEqual([mock.call(set(keys), "root", options=options)], + m_suk.call_args_list) + + @mock.patch(MODPATH + "ssh_util.setup_user_keys") + @mock.patch(MODPATH + "ug_util.normalize_users_groups") + @mock.patch(MODPATH + "os.path.exists") + def test_handle_no_cfg_and_default_root(self, m_ope, m_nug, m_suk): + """Test handle with no config and a default distro user.""" + cfg = {} + keys = ["key1"] + user = "clouduser" + # Mock os.path.exits to True to short-circuit the key writing logic + m_ope.return_value = True + m_nug.return_value = ({user: {"default": user}}, {}) + cloud = mock.Mock() + cloud.distro = mock.Mock() + cloud.get_public_ssh_keys = mock.Mock(return_value=keys) + cc_ssh.handle("name", cfg, cloud, self.logger, None) + + options = cc_ssh.DISABLE_ROOT_OPTS + options = options.replace("$USER", user) + options = options.replace("$ROOT", "root") + self._assert_has_calls( + m_suk, + [mock.call(set(keys), user), + mock.call(set(keys), "root", options=options)]) + + @mock.patch(MODPATH + "ssh_util.setup_user_keys") + @mock.patch(MODPATH + "ug_util.normalize_users_groups") + @mock.patch(MODPATH + "os.path.exists") + def test_handle_cfg_with_explicit_disable_root(self, m_ope, m_nug, m_suk): + """Test handle with explicit disable_root and a default distro user.""" + # This test is identical to test_handle_no_cfg_and_default_root, + # except this uses an explicit cfg value + cfg = {"disable_root": True} + keys = ["key1"] + user = "clouduser" + # Mock os.path.exits to True to short-circuit the key writing logic + m_ope.return_value = True + m_nug.return_value = ({user: {"default": user}}, {}) + cloud = mock.Mock() + cloud.distro = mock.Mock() + cloud.get_public_ssh_keys = mock.Mock(return_value=keys) + cc_ssh.handle("name", cfg, cloud, self.logger, None) + + options = cc_ssh.DISABLE_ROOT_OPTS + options = options.replace("$USER", user) + options = options.replace("$ROOT", "root") + self._assert_has_calls( + m_suk, + [mock.call(set(keys), user), + mock.call(set(keys), "root", options=options)]) + + @mock.patch(MODPATH + "ssh_util.setup_user_keys") + @mock.patch(MODPATH + "ug_util.normalize_users_groups") + @mock.patch(MODPATH + "os.path.exists") + def test_handle_cfg_with_disable_root_false(self, m_ope, m_nug, m_suk): + """Test handle with disable_root == False.""" + # When disable_root == False, the ssh redirect for root is skipped + cfg = {"disable_root": False} + keys = ["key1"] + user = "clouduser" + # Mock os.path.exits to True to short-circuit the key writing logic + m_ope.return_value = True + m_nug.return_value = ({user: {"default": user}}, {}) + cloud = mock.Mock() + cloud.distro = mock.Mock() + cloud.get_public_ssh_keys = mock.Mock(return_value=keys) + cc_ssh.handle("name", cfg, cloud, self.logger, None) + + self._assert_has_calls( + m_suk, + [mock.call(set(keys), user), + mock.call(set(keys), "root", options="")]) + + @mock.patch(MODPATH + "ssh_util.setup_user_keys") + @mock.patch(MODPATH + "ug_util.normalize_users_groups") + @mock.patch(MODPATH + "os.path.exists") + def test_handle_cfg_with_ssh_disable_users(self, m_ope, m_nug, m_suk): + """Test handle with ssh_disable_users.""" + cfg = {"ssh_disable_users": ["disable_me"]} + keys = ["key1"] + user = "clouduser" + # Mock os.path.exits to True to short-circuit the key writing logic + m_ope.return_value = True + m_nug.return_value = ({user: {"default": user}}, {}) + cloud = mock.Mock() + cloud.distro = mock.Mock() + cloud.get_public_ssh_keys = mock.Mock(return_value=keys) + cc_ssh.handle("name", cfg, cloud, self.logger, None) + + options = cc_ssh.DISABLE_ROOT_OPTS + options = options.replace("$USER", user) + options_root = options.replace("$ROOT", "root") + options_disable_me = options.replace("$ROOT", "disable_me") + # This will include "root" as "disable_root: True" is the default + self._assert_has_calls( + m_suk, + [mock.call(set(keys), user), + mock.call(set(keys), "root", options=options_root), + mock.call(set(keys), "disable_me", options=options_disable_me)]) + + @mock.patch(MODPATH + "ssh_util.setup_user_keys") + @mock.patch(MODPATH + "ug_util.normalize_users_groups") + @mock.patch(MODPATH + "os.path.exists") + def test_handle_cfg_with_ssh_disable_users_and_no_root(self, m_ope, m_nug, + m_suk): + """Test handle with ssh_disable_users and disable_root == False.""" + cfg = {"disable_root": False, + "ssh_disable_users": ["disable_me"]} + keys = ["key1"] + user = "clouduser" + # Mock os.path.exits to True to short-circuit the key writing logic + m_ope.return_value = True + m_nug.return_value = ({user: {"default": user}}, {}) + cloud = mock.Mock() + cloud.distro = mock.Mock() + cloud.get_public_ssh_keys = mock.Mock(return_value=keys) + cc_ssh.handle("name", cfg, cloud, self.logger, None) + + options = cc_ssh.DISABLE_ROOT_OPTS + options = options.replace("$USER", user) + options = options.replace("$ROOT", "disable_me") + self._assert_has_calls( + m_suk, + [mock.call(set(keys), user), + mock.call(set(keys), "root", options=""), + mock.call(set(keys), "disable_me", options=options)]) + + @mock.patch(MODPATH + "ssh_util.setup_user_keys") + @mock.patch(MODPATH + "ug_util.normalize_users_groups") + @mock.patch(MODPATH + "os.path.exists") + def test_handle_with_matching_ssh_disable_users_and_user(self, m_ope, + m_nug, m_suk): + """Test handle with a matching user and ssh_disable_users entry.""" + matching_user = "matching" + cfg = {"disable_root": False, + "ssh_disable_users": [matching_user]} + keys = ["key1"] + user = "clouduser" + # Mock os.path.exits to True to short-circuit the key writing logic + m_ope.return_value = True + m_nug.return_value = ({user: {"default": user}, + matching_user: {"default": False}}, {}) + cloud = mock.Mock() + cloud.distro = mock.Mock() + cloud.get_public_ssh_keys = mock.Mock(return_value=keys) + cc_ssh.handle("name", cfg, cloud, self.logger, None) + + self.assertEqual(m_suk.call_count, 2) + self._assert_has_calls( + m_suk, + [mock.call(set(keys), user), + mock.call(set(keys), "root", options="")]) diff --git a/doc/examples/cloud-config.txt b/doc/examples/cloud-config.txt index 774f66b..b6970a8 100644 --- a/doc/examples/cloud-config.txt +++ b/doc/examples/cloud-config.txt @@ -228,13 +228,25 @@ byobu_by_default: system # default: true disable_root: false +# disable ssh access for a list of users +# Block ssh login to these users and redirect to the 'ubuntu' or default distro +# user. This is useful for clouds which want to document a standard user across +# all environments, which then conflicts with the idea of a default distro +# user. This provides the hint of which user to use. +# default: [] +ssh_disable_users: + - cloud_default_user + # disable_root_opts: the value of this variable will prefix the # respective key in /root/.ssh/authorized_keys if disable_root is true +# and /home/$ENTRY/.ssh/authorized_keys for any ENTRY in the +# ssh_disable_users list # see 'man authorized_keys' for more information on what you can do here # # The string '$USER' will be replaced with the username of the default user +# and the string '$ROOT' will be replace with the username being redirected. # -# disable_root_opts: no-port-forwarding,no-agent-forwarding,no-X11-forwarding,command="echo 'Please login as the user \"$USER\" rather than the user \"root\".';echo;sleep 10" +# disable_root_opts: no-port-forwarding,no-agent-forwarding,no-X11-forwarding,command="echo 'Please login as the user \"$USER\" rather than the user \"$ROOT\".';echo;sleep 10" # set the locale to a given locale
_______________________________________________ Mailing list: https://launchpad.net/~cloud-init-dev Post to : cloud-init-dev@lists.launchpad.net Unsubscribe : https://launchpad.net/~cloud-init-dev More help : https://help.launchpad.net/ListHelp