Review: Approve +1 thanks for the rework + docstrings.
Diff comments: > diff --git a/tests/unittests/test_ds_identify.py > b/tests/unittests/test_ds_identify.py > new file mode 100644 > index 0000000..de645eb > --- /dev/null > +++ b/tests/unittests/test_ds_identify.py > @@ -0,0 +1,295 @@ > +# This file is part of cloud-init. See LICENSE file for license information. > + > +import copy > +import os > + > +from cloudinit import safeyaml > +from cloudinit import util > +from .helpers import CiTestCase, dir2dict, json_dumps, populate_dir > + > +UNAME_MYSYS = ("Linux bart 4.4.0-62-generic #83-Ubuntu " > + "SMP Wed Jan 18 14:10:15 UTC 2017 x86_64 GNU/Linux") > +BLKID_EFI_ROOT = """ > +DEVNAME=/dev/sda1 > +UUID=8B36-5390 > +TYPE=vfat > +PARTUUID=30d7c715-a6ae-46ee-b050-afc6467fc452 > + > +DEVNAME=/dev/sda2 > +UUID=19ac97d5-6973-4193-9a09-2e6bbfa38262 > +TYPE=ext4 > +PARTUUID=30c65c77-e07d-4039-b2fb-88b1fb5fa1fc > +""" > + > +DI_DEFAULT_POLICY = "search,found=all,maybe=all,notfound=enabled" > +DI_DEFAULT_POLICY_NO_DMI = "search,found=all,maybe=all,notfound=disabled" > + > +SHELL_MOCK_TMPL = """\ > +%(name)s() { > + local out='%(out)s' err='%(err)s' r='%(ret)s' RET='%(RET)s' > + [ "$out" = "_unset" ] || echo "$out" > + [ "$err" = "_unset" ] || echo "$err" 2>&1 > + [ "$RET" = "_unset" ] || _RET="$RET" > + return $r > +} > +""" > + > +UUIDS = [ > + 'e7db1e51-3eff-4ce9-8bfd-e8749884034a', uuid.uuid4() in the ConfigDrive definition and we're good to go. > + '4167a101-a2fc-4071-a078-f9d89b3c548d', > + '147c295d-f5f4-4d8b-8b90-14682a6ae072', > +] > + > +RC_FOUND = 0 > +RC_NOT_FOUND = 1 > +DS_NONE = 'None' > + > +P_PRODUCT_NAME = "sys/class/dmi/id/product_name" > +P_PRODUCT_SERIAL = "sys/class/dmi/id/product_serial" > +P_PRODUCT_UUID = "sys/class/dmi/id/product_uuid" > +P_DSID_CFG = "etc/cloud/ds-identify.cfg" > + > +MOCK_VIRT_IS_KVM = {'name': 'detect_virt', 'RET': 'kvm', 'ret': 0} > + > + > +class TestDsIdentify(CiTestCase): > + dsid_path = os.path.realpath('tools/ds-identify') > + > + def call(self, rootd=None, mocks=None, args=None, files=None, > + policy_dmi=DI_DEFAULT_POLICY, > + policy_nodmi=DI_DEFAULT_POLICY_NO_DMI): > + if args is None: > + args = [] > + if mocks is None: > + mocks = [] > + > + if files is None: > + files = {} > + > + if rootd is None: > + rootd = self.tmp_dir() > + > + unset = '_unset' > + wrap = self.tmp_path(path="_shwrap", dir=rootd) > + populate_dir(rootd, files) > + > + # DI_DEFAULT_POLICY* are declared always as to not rely > + # on the default in the code. This is because SRU releases change > + # the value in the code, and thus tests would fail there. > + head = [ > + "DI_MAIN=noop", > + "DEBUG_LEVEL=2", > + "DI_LOG=stderr", > + "PATH_ROOT='%s'" % rootd, > + ". " + self.dsid_path, > + 'DI_DEFAULT_POLICY="%s"' % policy_dmi, > + 'DI_DEFAULT_POLICY_NO_DMI="%s"' % policy_nodmi, > + "" > + ] > + > + def write_mock(data): > + ddata = {'out': None, 'err': None, 'ret': 0, 'RET': None} > + ddata.update(data) > + for k in ddata: > + if ddata[k] is None: > + ddata[k] = unset > + return SHELL_MOCK_TMPL % ddata > + > + mocklines = [] > + defaults = [ > + {'name': 'detect_virt', 'RET': 'none', 'ret': 1}, > + {'name': 'uname', 'out': UNAME_MYSYS}, > + {'name': 'blkid', 'out': BLKID_EFI_ROOT}, > + ] > + > + written = [d['name'] for d in mocks] > + for data in mocks: > + mocklines.append(write_mock(data)) > + for d in defaults: > + if d['name'] not in written: > + mocklines.append(write_mock(d)) > + > + endlines = [ > + 'main %s' % ' '.join(['"%s"' % s for s in args]) > + ] > + > + with open(wrap, "w") as fp: > + fp.write('\n'.join(head + mocklines + endlines) + "\n") > + > + rc = 0 > + try: > + out, err = util.subp(['sh', '-c', '. %s' % wrap], capture=True) > + except util.ProcessExecutionError as e: > + rc = e.exit_code > + out = e.stdout > + err = e.stderr > + > + cfg = None > + cfg_out = os.path.join(rootd, 'run/cloud-init/cloud.cfg') > + if os.path.exists(cfg_out): > + contents = util.load_file(cfg_out) > + try: > + cfg = safeyaml.load(contents) > + except Exception as e: > + cfg = {"_INVALID_YAML": contents, > + "_EXCEPTION": str(e)} > + > + return rc, out, err, cfg, dir2dict(rootd) > + > + def _call_via_dict(self, data, rootd=None, **kwargs): > + # return output of self.call with a dict input like VALID_CFG[item] > + xwargs = {'rootd': rootd} > + for k in ('mocks', 'args', 'policy_dmi', 'policy_nodmi', 'files'): > + if k in data: > + xwargs[k] = data[k] > + if k in kwargs: > + xwargs[k] = kwargs[k] > + > + return self.call(**xwargs) > + > + def _test_ds_found(self, name): > + data = copy.deepcopy(VALID_CFG[name]) > + return self._check_via_dict( > + data, RC_FOUND, dslist=[data.get('ds'), DS_NONE]) > + > + def _check_via_dict(self, data, rc, dslist=None, **kwargs): > + found_rc, out, err, cfg, files = self._call_via_dict(data, **kwargs) > + good = False > + try: > + self.assertEqual(rc, found_rc) > + if dslist is not None: > + self.assertEqual(dslist, cfg['datasource_list']) > + good = True > + finally: > + if not good: > + _print_run_output(rc, out, err, cfg, files) > + return rc, out, err, cfg, files > + > + def test_aws_ec2_hvm(self): > + """EC2: hvm instances use dmi serial and uuid starting with 'ec2'.""" > + self._test_ds_found('Ec2-hvm') > + > + def test_aws_ec2_xen(self): > + """EC2: sys/hypervisor/uuid starts with ec2.""" > + self._test_ds_found('Ec2-xen') > + > + def test_brightbox_is_ec2(self): > + """EC2: product_serial ends with 'brightbox.com'""" > + self._test_ds_found('Ec2-brightbox') > + > + def test_gce_by_product_name(self): > + """GCE identifies itself with product_name.""" > + self._test_ds_found('GCE') > + > + def test_gce_by_serial(self): > + """Older gce compute instances must be identified by serial.""" > + self._test_ds_found('GCE-serial') > + > + def test_config_drive(self): > + """ConfigDrive datasource has a disk with LABEL=config-2.""" > + self._test_ds_found('ConfigDrive') > + return > + > + def test_policy_disabled(self): > + """A Builtin policy of 'disabled' should return not found. > + > + Even though a search would find something, the builtin policy of > + disabled should cause the return of not found.""" > + mydata = copy.deepcopy(VALID_CFG['Ec2-hvm']) > + self._check_via_dict(mydata, rc=RC_NOT_FOUND, policy_dmi="disabled") > + > + def test_policy_config_disable_overrides_builtin(self): > + """explicit policy: disabled in config file should cause not > found.""" > + mydata = copy.deepcopy(VALID_CFG['Ec2-hvm']) > + mydata['files'][P_DSID_CFG] = '\n'.join(['policy: disabled', '']) > + self._check_via_dict(mydata, rc=RC_NOT_FOUND) > + > + def test_single_entry_defines_datasource(self): > + """If config has a single entry in datasource_list, that is used. > + > + Test the valid Ec2-hvm, but provide a config file that specifies > + a single entry in datasource_list. The configured value should > + be used.""" > + mydata = copy.deepcopy(VALID_CFG['Ec2-hvm']) > + cfgpath = 'etc/cloud/cloud.cfg.d/myds.cfg' > + mydata['files'][cfgpath] = 'datasource_list: ["NoCloud"]\n' > + self._check_via_dict(mydata, rc=RC_FOUND, dslist=['NoCloud', > DS_NONE]) > + > + > +def blkid_out(disks=None): > + """Convert a list of disk dictionaries into blkid content.""" > + if disks is None: > + disks = [] > + lines = [] > + for disk in disks: > + if not disk["DEVNAME"].startswith("/dev/"): > + disk["DEVNAME"] = "/dev/" + disk["DEVNAME"] > + for key in disk: > + lines.append("%s=%s" % (key, disk[key])) > + lines.append("") > + return '\n'.join(lines) > + > + > +def _print_run_output(rc, out, err, cfg, files): > + """A helper to print return of TestDsIdentify. > + > + _print_run_output(self.call())""" > + print('\n'.join([ > + '-- rc = %s --' % rc, > + '-- out --', str(out), > + '-- err --', str(err), > + '-- cfg --', json_dumps(cfg)])) > + print('-- files --') > + for k, v in files.items(): > + if "/_shwrap" in k: > + continue > + print(' === %s ===' % k) > + for line in v.splitlines(): > + print(" " + line) > + > + > +VALID_CFG = { > + 'Ec2-hvm': { > + 'ds': 'Ec2', > + 'mocks': [{'name': 'detect_virt', 'RET': 'kvm', 'ret': 0}], > + 'files': { > + P_PRODUCT_SERIAL: 'ec23aef5-54be-4843-8d24-8c819f88453e\n', > + P_PRODUCT_UUID: 'EC23AEF5-54BE-4843-8D24-8C819F88453E\n', > + } > + }, > + 'Ec2-xen': { > + 'ds': 'Ec2', > + 'mocks': [{'name': 'detect_virt', 'RET': 'xen', 'ret': 0}], > + 'files': { > + 'sys/hypervisor/uuid': 'ec2c6e2f-5fac-4fc7-9c82-74127ec14bbb\n' > + }, > + }, > + 'Ec2-brightbox': { > + 'ds': 'Ec2', > + 'files': {P_PRODUCT_SERIAL: 'facc6e2f.brightbox.com\n'}, > + }, > + 'GCE': { > + 'ds': 'GCE', > + 'files': {P_PRODUCT_NAME: 'Google Compute Engine\n'}, > + 'mocks': [MOCK_VIRT_IS_KVM], > + }, > + 'GCE-serial': { > + 'ds': 'GCE', > + 'files': {P_PRODUCT_SERIAL: 'GoogleCloud-8f2e88f\n'}, > + 'mocks': [MOCK_VIRT_IS_KVM], > + }, > + 'ConfigDrive': { > + 'ds': 'ConfigDrive', > + 'mocks': [ > + {'name': 'blkid', 'ret': 0, > + 'out': blkid_out( > + [{'DEVNAME': 'vda1', 'TYPE': 'vfat', 'PARTUUID': UUIDS[0]}, > + {'DEVNAME': 'vda2', 'TYPE': 'ext4', > + 'LABEL': 'cloudimg-rootfs', 'PARTUUID': UUIDS[1]}, > + {'DEVNAME': 'vdb', 'TYPE': 'vfat', 'LABEL': 'config-2'}]) > + }, > + ], > + }, > +} > + > +# vi: ts=4 expandtab -- https://code.launchpad.net/~smoser/cloud-init/+git/cloud-init/+merge/323059 Your team cloud init development team is requested to review the proposed merge of ~smoser/cloud-init:feature/ds-identify-test into cloud-init:master. _______________________________________________ Mailing list: https://launchpad.net/~cloud-init-dev Post to : cloud-init-dev@lists.launchpad.net Unsubscribe : https://launchpad.net/~cloud-init-dev More help : https://help.launchpad.net/ListHelp