Some testing comments (in addition to those in IRC); will now review the code itself.
Diff comments: > === added file 'cloudinit/config/cc_rh_subscription.py' > --- cloudinit/config/cc_rh_subscription.py 1970-01-01 00:00:00 +0000 > +++ cloudinit/config/cc_rh_subscription.py 2015-05-21 18:38:12 +0000 > @@ -0,0 +1,402 @@ > +# vi: ts=4 expandtab > +# > +# Copyright (C) Red Hat, Inc. This needs a year (presumably 2015 :p). > +# > +# Author: Brent Baude <[email protected]> > +# > +# This program is free software: you can redistribute it and/or modify > +# it under the terms of the GNU General Public License version 3, as > +# published by the Free Software Foundation. > +# > +# This program is distributed in the hope that it will be useful, > +# but WITHOUT ANY WARRANTY; without even the implied warranty of > +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the > +# GNU General Public License for more details. > +# > +# You should have received a copy of the GNU General Public License > +# along with this program. If not, see <http://www.gnu.org/licenses/>. > + > +import itertools > +from cloudinit import util > + > + > +def handle(_name, cfg, _cloud, log, _args): > + sm = SubscriptionManager(cfg) > + sm.log = log > + if not sm.is_registered: > + try: > + verify, verify_msg = sm._verify_keys() > + if verify is not True: > + raise SubscriptionError(verify_msg) > + cont = sm.rhn_register() > + if not cont: > + raise SubscriptionError("Registration failed or did not " > + "run completely") > + > + # Splitting up the registration, auto-attach, and servicelevel > + # commands because the error codes, messages from subman are not > + # specific enough. > + > + # Attempt to change the service level > + if sm.auto_attach and sm.servicelevel is not None: > + if not sm._set_service_level(): > + raise SubscriptionError("Setting of service-level " > + "failed") > + else: > + sm.log.debug("Completed auto-attach with service level") > + elif sm.auto_attach: > + if not sm._set_auto_attach(): > + raise SubscriptionError("Setting auto-attach failed") > + else: > + sm.log.debug("Completed auto-attach") > + > + if sm.pools is not None: > + if type(sm.pools) is not list: > + pool_fail = "Pools must in the format of a list" > + raise SubscriptionError(pool_fail) > + > + return_stat = sm.addPool(sm.pools) > + if not return_stat: > + raise SubscriptionError("Unable to attach pools {0}" > + .format(sm.pools)) > + if (sm.enable_repo is not None) or (sm.disable_repo is not None): > + return_stat = sm.update_repos(sm.enable_repo, > sm.disable_repo) > + if not return_stat: > + raise SubscriptionError("Unable to add or remove repos") > + sm.log.info("rh_subscription plugin completed successfully") > + except SubscriptionError as e: > + sm.log.warn(str(e)) > + sm.log.warn("rh_subscription plugin did not complete > successfully") > + else: > + sm.log.info("System is already registered") > + > + > +class SubscriptionError(Exception): > + pass > + > + > +class SubscriptionManager(object): > + def __init__(self, cfg): > + self.cfg = cfg > + self.rhel_cfg = self.cfg.get('rh_subscription', {}) > + self.rhsm_baseurl = self.rhel_cfg.get('rhsm-baseurl') > + self.server_hostname = self.rhel_cfg.get('server-hostname') > + self.pools = self.rhel_cfg.get('add-pool') > + self.activation_key = self.rhel_cfg.get('activation-key') > + self.org = self.rhel_cfg.get('org') > + self.userid = self.rhel_cfg.get('username') > + self.password = self.rhel_cfg.get('password') > + self.auto_attach = self.rhel_cfg.get('auto-attach') > + self.enable_repo = self.rhel_cfg.get('enable-repo') > + self.disable_repo = self.rhel_cfg.get('disable-repo') > + self.servicelevel = self.rhel_cfg.get('service-level') > + self.subman = ['subscription-manager'] > + self.valid_rh_keys = ['org', 'activation-key', 'username', > 'password', > + 'disable-repo', 'enable-repo', 'add-pool', > + 'rhsm-baseurl', 'server-hostname', > + 'auto-attach', 'service-level'] > + self.is_registered = self._is_registered() > + > + def _verify_keys(self): > + ''' > + Checks that the keys in the rh_subscription dict from the user-data > + are what we expect. > + ''' > + > + for k in self.rhel_cfg: > + if k not in self.valid_rh_keys: > + bad_key = "{0} is not a valid key for rh_subscription. "\ > + "Valid keys are: "\ > + "{1}".format(k, ', '.join(self.valid_rh_keys)) > + return False, bad_key > + > + # Check for bad auto-attach value > + if (self.auto_attach is not None) and \ > + (str(self.auto_attach).upper() not in ['TRUE', 'FALSE']): > + not_bool = "The key auto-attach must be a value of "\ > + "either True or False" > + return False, not_bool > + > + if (self.servicelevel is not None) and \ > + ((not self.auto_attach) or > + (str(self.auto_attach).upper() == "FALSE")): > + > + no_auto = "The service-level key must be used in conjunction > with "\ > + "the auto-attach key. Please re-run with auto-attach: > "\ > + "True" > + return False, no_auto > + return True, None > + > + def _is_registered(self): > + ''' > + Checks if the system is already registered and returns > + True if so, else False > + ''' > + cmd = list(itertools.chain(self.subman, ['identity'])) > + > + try: > + self._sub_man_cli(cmd) > + except util.ProcessExecutionError: > + return False > + > + return True > + > + def _sub_man_cli(self, cmd, logstring_val=False): > + ''' > + Uses the prefered cloud-init subprocess def of util.subp > + and runs subscription-manager. Breaking this to a > + separate function for later use in mocking and unittests > + ''' > + return_out, return_err = util.subp(cmd, logstring=logstring_val) > + > + return return_out, return_err > + > + def rhn_register(self): > + ''' > + Registers the system by userid and password or activation key > + and org. Returns True when successful False when not. > + ''' > + > + if (self.activation_key is not None) and (self.org is not None): > + # register by activation key > + cmd = list(itertools.chain(self.subman, ['register', > + '--activationkey={0}'. > + format(self.activation_key), > + '--org={0}'.format(self.org)])) > + > + # If the baseurl and/or server url are passed in, we register > + # with them. > + > + if self.rhsm_baseurl is not None: > + cmd.append("--baseurl={0}".format(self.rhsm_baseurl)) > + > + if self.server_hostname is not None: > + cmd.append("--serverurl={0}".format(self.server_hostname)) > + > + try: > + return_out, return_err = self._sub_man_cli(cmd, > + > logstring_val=True) > + except util.ProcessExecutionError as e: > + if e.stdout == "": > + self.log.warn("Registration failed due " > + "to: {0}".format(e.stderr)) > + return False > + > + elif (self.userid is not None) and (self.password is not None): > + # register by username and password > + cmd = list(itertools.chain(self.subman, ['register', > + '--username={0}'.format(self.userid), > + '--password={0}'.format(self.password)])) > + > + # If the baseurl and/or server url are passed in, we register > + # with them. > + > + if self.rhsm_baseurl is not None: > + cmd.append("--baseurl={0}".format(self.rhsm_baseurl)) > + > + if self.server_hostname is not None: > + cmd.append("--serverurl={0}".format(self.server_hostname)) > + > + # Attempting to register the system only > + try: > + return_out, return_err = self._sub_man_cli(cmd, > + > logstring_val=True) > + except util.ProcessExecutionError as e: > + if e.stdout == "": > + self.log.warn("Registration failed due " > + "to: {0}".format(e.stderr)) > + return False > + > + else: > + self.log.warn("Unable to register system due to incomplete " > + "information.") > + self.log.warn("Use either activationkey and org *or* userid " > + "and password") > + return False > + > + reg_id = return_out.split("ID: ")[1].rstrip() > + self.log.debug("Registered successfully with ID {0}".format(reg_id)) > + return True > + > + def _set_service_level(self): > + cmd = list(itertools.chain(self.subman, > + ['attach', '--auto', '--servicelevel={0}' > + .format(self.servicelevel)])) > + > + try: > + return_out, return_err = self._sub_man_cli(cmd) > + except util.ProcessExecutionError as e: > + if e.stdout.rstrip() != '': > + for line in e.stdout.split("\n"): > + if line is not '': > + self.log.warn(line) > + else: > + self.log.warn("Setting the service level failed with: " > + "{0}".format(e.stderr.strip())) > + return False > + for line in return_out.split("\n"): > + if line is not "": > + self.log.debug(line) > + return True > + > + def _set_auto_attach(self): > + cmd = list(itertools.chain(self.subman, ['attach', '--auto'])) > + try: > + return_out, return_err = self._sub_man_cli(cmd) > + except util.ProcessExecutionError: > + self.log.warn("Auto-attach failed with: " > + "{0}]".format(return_err.strip())) > + return False > + for line in return_out.split("\n"): > + if line is not "": > + self.log.debug(line) > + return True > + > + def _getPools(self): > + ''' > + Gets the list pools for the active subscription and returns them > + in list form. > + ''' > + available = [] > + consumed = [] > + > + # Get all available pools > + cmd = list(itertools.chain(self.subman, ['list', '--available', > + '--pool-only'])) > + results, errors = self._sub_man_cli(cmd) > + available = (results.rstrip()).split("\n") > + > + # Get all consumed pools > + cmd = list(itertools.chain(self.subman, ['list', '--consumed', > + '--pool-only'])) > + results, errors = self._sub_man_cli(cmd) > + consumed = (results.rstrip()).split("\n") > + > + return available, consumed > + > + def _getRepos(self): > + ''' > + Obtains the current list of active yum repositories and returns > + them in list form. > + ''' > + > + cmd = list(itertools.chain(self.subman, ['repos', '--list-enabled'])) > + return_out, return_err = self._sub_man_cli(cmd) > + active_repos = [] > + for repo in return_out.split("\n"): > + if "Repo ID:" in repo: > + active_repos.append((repo.split(':')[1]).strip()) > + > + cmd = list(itertools.chain(self.subman, ['repos', > '--list-disabled'])) > + return_out, return_err = self._sub_man_cli(cmd) > + > + inactive_repos = [] > + for repo in return_out.split("\n"): > + if "Repo ID:" in repo: > + inactive_repos.append((repo.split(':')[1]).strip()) > + return active_repos, inactive_repos > + > + def addPool(self, pools): > + ''' > + Takes a list of subscription pools and "attaches" them to the > + current subscription > + ''' > + > + # An empty list was passed > + if len(pools) == 0: > + self.log.debug("No pools to attach") > + return True > + > + pool_available, pool_consumed = self._getPools() > + pool_list = [] > + cmd = list(itertools.chain(self.subman, ['attach'])) > + for pool in pools: > + if (pool not in pool_consumed) and (pool in pool_available): > + pool_list.append('--pool={0}'.format(pool)) > + else: > + self.log.warn("Pool {0} is not available".format(pool)) > + if len(pool_list) > 0: > + cmd.extend(pool_list) > + try: > + self._sub_man_cli(cmd) > + self.log.debug("Attached the following pools to your " > + "system: %s" % (", ".join(pool_list)) > + .replace('--pool=', '')) > + return True > + except util.ProcessExecutionError as e: > + self.log.warn("Unable to attach pool {0} " > + "due to {1}".format(pool, e)) > + return False > + > + def update_repos(self, erepos, drepos): > + ''' > + Takes a list of yum repo ids that need to be disabled or enabled; > then > + it verifies if they are already enabled or disabled and finally > + executes the action to disable or enable > + ''' > + > + if (erepos is not None) and (type(erepos) is not list): > + self.log.warn("Repo IDs must in the format of a list.") > + return False > + > + if (drepos is not None) and (type(drepos) is not list): > + self.log.warn("Repo IDs must in the format of a list.") > + return False > + > + # Bail if both lists are not populated > + if (len(erepos) == 0) and (len(drepos) == 0): > + self.log.debug("No repo IDs to enable or disable") > + return True > + > + active_repos, inactive_repos = self._getRepos() > + # Creating a list of repoids to be enabled > + enable_list = [] > + enable_list_fail = [] > + for repoid in erepos: > + if (repoid in inactive_repos): > + enable_list.append("--enable={0}".format(repoid)) > + else: > + enable_list_fail.append(repoid) > + > + # Creating a list of repoids to be disabled > + disable_list = [] > + disable_list_fail = [] > + for repoid in drepos: > + if repoid in active_repos: > + disable_list.append("--disable={0}".format(repoid)) > + else: > + disable_list_fail.append(repoid) > + > + # Logging any repos that are already enabled or disabled > + if len(enable_list_fail) > 0: > + for fail in enable_list_fail: > + # Check if the repo exists or not > + if fail in active_repos: > + self.log.debug("Repo {0} is already > enabled".format(fail)) > + else: > + self.log.warn("Repo {0} does not appear to " > + "exist".format(fail)) > + if len(disable_list_fail) > 0: > + for fail in disable_list_fail: > + self.log.debug("Repo {0} not disabled " > + "because it is not enabled".format(fail)) > + > + cmd = list(itertools.chain(self.subman, ['repos'])) > + if enable_list > 0: > + cmd.extend(enable_list) > + if disable_list > 0: > + cmd.extend(disable_list) > + > + try: > + self._sub_man_cli(cmd) > + except util.ProcessExecutionError as e: > + self.log.warn("Unable to alter repos due to {0}".format(e)) > + return False > + > + if enable_list > 0: > + self.log.debug("Enabled the following repos: %s" % > + (", ".join(enable_list)).replace('--enable=', '')) > + if disable_list > 0: > + self.log.debug("Disabled the following repos: %s" % > + (", ".join(disable_list)).replace('--disable=', > '')) > + return True > > === added file 'doc/examples/cloud-config-rh_subscription.txt' > --- doc/examples/cloud-config-rh_subscription.txt 1970-01-01 00:00:00 > +0000 > +++ doc/examples/cloud-config-rh_subscription.txt 2015-05-21 18:38:12 > +0000 > @@ -0,0 +1,49 @@ > +#cloud-config > + > +# register your Red Hat Enterprise Linux based operating system > +# > +# this cloud-init plugin is capable of registering by username > +# and password *or* activation and org. Following a successfully > +# registration you can: > +# - auto-attach subscriptions > +# - set the service level > +# - add subscriptions based on its pool ID > +# - enable yum repositories based on its repo id > +# - disable yum repositories based on its repo id > +# - alter the rhsm_baseurl and server-hostname in the > +# /etc/rhsm/rhs.conf file > + > +rh_subscription: > + username: [email protected] > + > + ## Quote your password if it has symbols to be safe > + password: '1234abcd' > + > + ## If you prefer, you can use the activation key and > + ## org instead of username and password. Be sure to > + ## comment out username and password > + > + #activation-key: foobar > + #org: 12345 > + > + ## Uncomment to auto-attach subscriptions to your system > + #auto-attach: True > + > + ## Uncomment to set the service level for your > + ## subscriptions > + #service-level: self-support > + > + ## Uncomment to add pools (needs to be a list of IDs) > + #add-pool: [] > + > + ## Uncomment to add or remove yum repos > + ## (needs to be a list of repo IDs) > + #enable-repo: [] > + #disable-repo: [] > + > + ## Uncomment to alter the baseurl in /etc/rhsm/rhsm.conf > + #rhsm-baseurl: http://url > + > + ## Uncomment to alter the server hostname in > + ## /etc/rhsm/rhsm.conf > + #server-hostname: foo.bar.com > > === added file 'tests/unittests/test_rh_subscription.py' > --- tests/unittests/test_rh_subscription.py 1970-01-01 00:00:00 +0000 > +++ tests/unittests/test_rh_subscription.py 2015-05-21 18:38:12 +0000 > @@ -0,0 +1,231 @@ > +from cloudinit import util > +from cloudinit.config import cc_rh_subscription > +import logging > +import mock > +import unittest > + > + > +class GoodTests(unittest.TestCase): > + def setUp(self): > + super(GoodTests, self).setUp() > + self.name = "cc_rh_subscription" > + self.cloud_init = None > + self.log = logging.getLogger("good_tests") > + self.args = [] > + self.handle = cc_rh_subscription.handle > + self.SM = cc_rh_subscription.SubscriptionManager > + > + self.config = {'rh_subscription': > + {'username': '[email protected]', > + 'password': 'scooby-snacks' > + }} > + self.config_full = {'rh_subscription': > + {'username': '[email protected]', > + 'password': 'scooby-snacks', > + 'auto-attach': True, > + 'service-level': 'self-support', > + 'add-pool': ['pool1', 'pool2', 'pool3'], > + 'enable-repo': ['repo1', 'repo2', 'repo3'], > + 'disable-repo': ['repo4', 'repo5'] > + }} > + > + def test_already_registered(self): > + ''' > + Emulates a system that is already registered. Ensure it gets > + a non-ProcessExecution error from is_registered() > + ''' > + good_message = 'System is already registered' > + with mock.patch.object(cc_rh_subscription.SubscriptionManager, > + '_sub_man_cli') as mockobj: > + self.log.info = mock.MagicMock(wraps=self.log.info) > + self.handle(self.name, self.config, self.cloud_init, > + self.log, self.args) > + self.assertEqual(mockobj.call_count, 1) > + self.log.info.assert_called_with(good_message) > + > + def test_simple_registration(self): > + ''' > + Simple registration with username and password > + ''' > + good_message = 'rh_subscription plugin completed successfully' > + self.log.info = mock.MagicMock(wraps=self.log.info) > + reg = "The system has been registered with ID:" \ > + " 12345678-abde-abcde-1234-1234567890abc" > + self.SM._sub_man_cli = mock.MagicMock( > + side_effect=[util.ProcessExecutionError, (reg, 'bar')]) > + self.handle(self.name, self.config, self.cloud_init, > + self.log, self.args) > + self.SM._sub_man_cli.assert_called_with_once(['subscription-manager', > + 'identity']) > + self.SM._sub_man_cli.assert_called_with_once( > + ['subscription-manager', 'register', '[email protected]', > + '--password=scooby-snacks'], logstring_val=True) > + > + self.log.info.assert_called_with(good_message) > + self.assertEqual(self.SM._sub_man_cli.call_count, 2) > + > + def test_full_registration(self): > + ''' > + Registration with auto-attach, service-level, adding pools, > + and enabling and disabling yum repos > + ''' > + pool_message = 'Pool pool2 is not available' > + repo_message1 = 'Repo repo1 is already enabled' > + repo_message2 = 'Enabled the following repos: repo2, repo3' > + good_message = 'rh_subscription plugin completed successfully' > + self.log.warn = mock.MagicMock(wraps=self.log.warn) > + self.log.debug = mock.MagicMock(wraps=self.log.debug) > + reg = "The system has been registered with ID:" \ > + " 12345678-abde-abcde-1234-1234567890abc" > + self.SM._sub_man_cli = mock.MagicMock( > + side_effect=[util.ProcessExecutionError, (reg, 'bar'), > + ('Service level set to: self-support', ''), > + ('pool1\npool3\n', ''), ('pool2\n', ''), ('', ''), > + ('Repo ID: repo1\nRepo ID: repo5\n', ''), > + ('Repo ID: repo2\nRepo ID: repo3\nRepo ID: ' > + 'repo4', ''), > + ('', '')]) > + self.handle(self.name, self.config_full, self.cloud_init, > + self.log, self.args) > + self.log.warn.assert_any_call(pool_message) > + self.log.debug.assert_any_call(repo_message1) > + self.log.debug.assert_any_call(repo_message2) > + self.log.info.assert_any_call(good_message) > + self.SM._sub_man_cli.assert_called_with_once(['subscription-manager', > + 'attach', > '-pool=pool1', > + '--pool=pool33']) > + self.assertEqual(self.SM._sub_man_cli.call_count, 9) > + > + > +class BadTests(unittest.TestCase): I'd suggest renaming these; if they're bad tests, why keep 'em? :p > + def setUp(self): > + super(BadTests, self).setUp() > + self.name = "cc_rh_subscription" Attributes that aren't changed in test runs (particularly those that are constants shared between the two test classes) are probably better off as module-level variables; it makes it easier to look at each test and understand what's happening. (If I see self.name, I wonder where else it is set/changed and why the test cares about it changing; if I see NAME, I know it's a constant that the test is checking). > + self.cloud_init = None > + self.log = logging.getLogger("bad_tests") > + self.orig = self.log > + self.args = [] > + self.handle = cc_rh_subscription.handle > + self.SM = cc_rh_subscription.SubscriptionManager > + self.reg = "The system has been registered with ID:" \ > + " 12345678-abde-abcde-1234-1234567890abc" > + > + self.config_no_password = {'rh_subscription': > + {'username': '[email protected]' > + }} > + > + self.config_no_key = {'rh_subscription': > + {'activation-key': '1234abcde', > + }} > + > + self.config_service = {'rh_subscription': > + {'username': '[email protected]', > + 'password': 'scooby-snacks', > + 'service-level': 'self-support' > + }} > + > + self.config_badpool = {'rh_subscription': > + {'username': '[email protected]', > + 'password': 'scooby-snacks', > + 'add-pool': 'not_a_list' > + }} > + self.config_badrepo = {'rh_subscription': > + {'username': '[email protected]', > + 'password': 'scooby-snacks', > + 'enable-repo': 'not_a_list' > + }} > + self.config_badkey = {'rh_subscription': > + {'activation_key': 'abcdef1234', > + 'org': '123', > + }} > + > + def test_no_password(self): > + ''' > + Attempt to register without the password key/value > + ''' > + self.missing_info(self.config_no_password) > + > + def test_no_org(self): > + ''' > + Attempt to register without the org key/value > + ''' > + self.missing_info(self.config_no_key) > + > + def test_service_level_without_auto(self): > + ''' > + Attempt to register using service-level without the auto-attach key > + ''' > + good_message = 'The service-level key must be used in conjunction'\ > + ' with the auto-attach key. Please re-run with '\ > + 'auto-attach: True' > + > + self.log.warn = mock.MagicMock(wraps=self.log.warn) > + self.SM._sub_man_cli = mock.MagicMock( > + side_effect=[util.ProcessExecutionError, (self.reg, 'bar')]) > + self.handle(self.name, self.config_service, self.cloud_init, > + self.log, self.args) > + self.log.warn.assert_any_call(good_message) > + self.assertRaises(cc_rh_subscription.SubscriptionError) This isn't testing anything; it only isn't erroring because it thinks you're instantiating a context manager. :) > + self.assertEqual(self.SM._sub_man_cli.call_count, 1) > + > + def test_pool_not_a_list(self): > + ''' > + Register with pools that are not in the format of a list > + ''' > + good_message = "Pools must in the format of a list" > + self.log.warn = mock.MagicMock(wraps=self.log.warn) > + self.SM._sub_man_cli = mock.MagicMock( > + side_effect=[util.ProcessExecutionError, (self.reg, 'bar')]) > + self.handle(self.name, self.config_badpool, self.cloud_init, > + self.log, self.args) > + self.log.warn.assert_any_call(good_message) > + self.assertRaises(cc_rh_subscription.SubscriptionError) > + self.assertEqual(self.SM._sub_man_cli.call_count, 2) > + > + def test_repo_not_a_list(self): > + ''' > + Register with repos that are not in the format of a list > + ''' > + good_message = "Repo IDs must in the format of a list." > + self.log.warn = mock.MagicMock(wraps=self.log.warn) > + self.SM._sub_man_cli = mock.MagicMock( > + side_effect=[util.ProcessExecutionError, (self.reg, 'bar')]) > + self.handle(self.name, self.config_badrepo, self.cloud_init, > + self.log, self.args) > + self.log.warn.assert_any_call(good_message) > + self.assertRaises(cc_rh_subscription.SubscriptionError) > + self.assertEqual(self.SM._sub_man_cli.call_count, 2) > + > + def test_bad_key_value(self): > + ''' > + Attempt to register with a key that we don't know > + ''' > + good_message = 'activation_key is not a valid key for > rh_subscription.'\ > + ' Valid keys are: org, activation-key, username, '\ > + 'password, disable-repo, enable-repo, add-pool,'\ > + ' rhsm-baseurl, server-hostname, auto-attach, '\ > + 'service-level' > + self.log.warn = mock.MagicMock(wraps=self.log.warn) > + self.SM._sub_man_cli = mock.MagicMock( > + side_effect=[util.ProcessExecutionError, (self.reg, 'bar')]) > + self.handle(self.name, self.config_badkey, self.cloud_init, > + self.log, self.args) > + self.assertRaises(cc_rh_subscription.SubscriptionError) > + self.log.warn.assert_any_call(good_message) > + self.assertEqual(self.SM._sub_man_cli.call_count, 1) > + > + def missing_info(self, config): I'd give this a more descriptive name; perhaps check_missing_info_causes_log_message. > + ''' > + Helper def for tests that having missing information > + ''' > + good_message = "Unable to register system due to incomplete "\ > + "information." > + self.log.warn = mock.MagicMock(wraps=self.log.warn) > + self.SM._sub_man_cli = mock.MagicMock( > + side_effect=[util.ProcessExecutionError]) > + self.handle(self.name, config, self.cloud_init, > + self.log, self.args) > + self.SM._sub_man_cli.assert_called_with(['subscription-manager', > + 'identity']) > + self.log.warn.assert_any_call(good_message) > + self.assertEqual(self.SM._sub_man_cli.call_count, 1) > -- https://code.launchpad.net/~bbaude/cloud-init/rh_subscription/+merge/259159 Your team cloud init development team is requested to review the proposed merge of lp:~bbaude/cloud-init/rh_subscription into lp:cloud-init. _______________________________________________ Mailing list: https://launchpad.net/~cloud-init-dev Post to : [email protected] Unsubscribe : https://launchpad.net/~cloud-init-dev More help : https://help.launchpad.net/ListHelp

