This is Chris's patch about xml_utils*.
I put it here just for completeness.^_^
Because I cannot get it in master branch as now.
*PLEASE DO NOT COMMENT*
Signed-off-by: Yu Mingfei <[email protected]>
---
client/shared/xml_utils.py | 147 ++++++++++++++++++++++++
client/shared/xml_utils_unittest.py | 213 ++++++++++++++++++++++++++++++++++-
2 files changed, 358 insertions(+), 2 deletions(-)
diff --git a/client/shared/xml_utils.py b/client/shared/xml_utils.py
index d4bae45..cdfef25 100644
--- a/client/shared/xml_utils.py
+++ b/client/shared/xml_utils.py
@@ -3,9 +3,156 @@
in python 2.4 systems.
"""
+import os.path, shutil, tempfile
+
try:
import autotest.common as common
except ImportError:
import common
+import logging
+
from autotest.client.shared import ElementTree
+
+# Used by unittests
+TMPPFX='xml_utils_temp_'
+TMPSFX='.xml'
+
+class TempXMLFile(file):
+ """
+ Temporary XML file removed on instance deletion / unexceptional module
exit.
+ """
+
+ def __init__(self, suffix=TMPSFX, prefix=TMPPFX, mode="wb+", buffer=1):
+ """
+ Initialize temporary XML file removed on instance destruction.
+
+ param: suffix: temporary file's suffix
+ param: prefix: temporary file's prefix
+ param: mode: file access mode
+ param: buffer: size of buffer in bytes, 1: line buffered
+ """
+ fd,path = tempfile.mkstemp(suffix=suffix, prefix=prefix)
+ os.close(fd)
+ super(TempXMLFile, self).__init__(path, mode, buffer)
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ """
+ Always remove temporary file on module exit.
+ """
+ self.__del__()
+ super(TempXMLFile, self).__exit__(exc_type, exc_value, traceback)
+
+ def __del__(self):
+ """
+ Remove temporary file on instance delete.
+ """
+ try:
+ os.unlink(self.name)
+ except OSError:
+ pass # don't care
+
+class XMLBackup(TempXMLFile):
+ """Temporary XML backuap, removed on unexceptional destruction."""
+
+ sourcefilename = None
+
+ def __init__(self, sourcefilename):
+ """
+ Initialize a temporary backup from sourcefilename.
+ """
+ super(XMLBackup, self).__init__()
+ self.sourcefilename = sourcefilename
+ self.backup()
+
+ def backup(self):
+ """
+ Overwrite temporary backup with contents of original source.
+ """
+ self.flush()
+ self.seek(0)
+ shutil.copyfileobj(file(self.sourcefilename, "rb"),
super(XMLBackup,self))
+ self.seek(0)
+
+ def restore(self):
+ """
+ Overwrite original source with contents of temporary backup
+ """
+ self.flush()
+ self.seek(0)
+ shutil.copyfileobj(super(XMLBackup,self), file(self.sourcefilename,
"wb+"))
+ self.seek(0)
+
+ def _info(self):
+ logging.info("Retaining backup of %s in %s", self.sourcefilename,
+ self.name)
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ """
+ Remove temporary backup on unexceptional module exit.
+ """
+ if exc_type is None and exc_value is None and traceback is None:
+ super(XMLBackup, self).__del__()
+ else:
+ self._info()
+
+ def __del__(self):
+ """
+ Remove temporary file on instance delete.
+ """
+ self._info()
+
+class XMLBase(ElementTree.ElementTree, XMLBackup):
+ """ElementTree backed by a file copy of source"""
+
+ # Automaticaly remove temp file instance destruction
+ tempsource = None
+
+ def __init__(self, xml):
+ """
+ Initialize from a string or filename containing XML source.
+
+ param: xml: A filename or string containing XML
+ """
+ # xml param could be xml string or readable filename
+ if not self.readablefile(xml):
+ self.tempsource = TempXMLFile()
+ self.tempsource.write(xml)
+ # Prevent source modification
+ self.tempsource.close()
+ xml = self.tempsource.name
+ # xml guaranteed to be a filename
+ XMLBackup.__init__(self, sourcefilename=xml)
+ ElementTree.ElementTree.__init__(self,element=None, file=xml)
+
+ @classmethod
+ def readablefile(cls, filename):
+ """
+ Returns True/False if filename exists and is readable
+ """
+ try:
+ test = os.stat(filename)
+ except (OSError, IOError):
+ return False
+ return True
+
+ def restore(self):
+ """
+ Restore if instance initialized from string, otherwise raise IOError.
+ """
+ if self.tempsource:
+ super(XMLBase, self).restore()
+ else:
+ raise IOError, "Can't overwrite %s" % self.sourcefilename
+
+ def write(self, filename=None, encoding="UTF-8"):
+ """
+ Write current XML tree to filename, or self.name if None.
+ """
+ if filename is None:
+ filename = self.name
+ ElementTree.ElementTree.write(self, filename, encoding)
+
+ def read(self, xml):
+ self.__del__()
+ self.__init__(xml)
diff --git a/client/shared/xml_utils_unittest.py
b/client/shared/xml_utils_unittest.py
index 6faf434..3af9161 100755
--- a/client/shared/xml_utils_unittest.py
+++ b/client/shared/xml_utils_unittest.py
@@ -1,6 +1,6 @@
#!/usr/bin/python
-import unittest
+import unittest, tempfile, os, glob
try:
import autotest.common as common
@@ -10,11 +10,220 @@ except ImportError:
from autotest.client.shared import xml_utils, ElementTree
-class test_xml_utils(unittest.TestCase):
+class xml_test_data(unittest.TestCase):
+
+ def setUp(self):
+ # Compacted to save excess scrolling
+ self.TEXT_REPLACE_KEY="TEST_XML_TEXT_REPLACE"
+ self.XMLSTR="""<?xml version='1.0'
encoding='UTF-8'?><capabilities><host>
+
<uuid>4d515db1-9adc-477d-8195-f817681e72e6</uuid><cpu><arch>x86_64</arch>
+ <model>Westmere</model><vendor>Intel</vendor><topology sockets='1'
+ cores='2' threads='2'/><feature name='rdtscp'/><feature name='x2apic'/>
+ <feature name='xtpr'/><feature name='tm2'/><feature name='est'/>
+ <feature name='vmx'/><feature name='ds_cpl'/><feature name='monitor'/>
+ <feature name='pbe'/><feature name='tm'/><feature name='ht'/><feature
+ name='ss'/><feature name='acpi'/><feature name='ds'/><feature
+ name='vme'/></cpu><migration_features><live/><uri_transports>
+ <uri_transport>tcp</uri_transport></uri_transports>
+ </migration_features><topology><cells num='1'><cell id='0'><cpus
+ num='4'><cpu id='0'/><cpu id='1'/><cpu id='2'/><cpu id='3'/></cpus>
+ </cell></cells></topology><secmodel><model>selinux</model><doi>0</doi>
+ </secmodel></host><guest><os_type>hvm</os_type><arch name='i686'>
+ <wordsize>32</wordsize><emulator>TEST_XML_TEXT_REPLACE</emulator>
+ <machine>rhel6.2.0</machine><machine canonical='rhel6.2.0'>pc</machine>
+ <machine>rhel6.1.0</machine><machine>rhel6.0.0</machine><machine>
+ rhel5.5.0</machine><machine>rhel5.4.4</machine><machine>rhel5.4.0
+ </machine><domain type='qemu'></domain><domain type='kvm'><emulator>
+ /usr/libexec/qemu-kvm</emulator></domain></arch><features><cpuselection
+ /><deviceboot/><pae/><nonpae/><acpi default='on' toggle='yes'/><apic
+ default='on' toggle='no'/></features></guest></capabilities>"""
+ (fd, self.XMLFILE) = tempfile.mkstemp(suffix=xml_utils.TMPSFX,
+ prefix=xml_utils.TMPPFX)
+ os.write(fd, self.XMLSTR)
+ os.close(fd)
+ self.canonicalize_test_xml()
+
+ def tearDown(self):
+ for filename in glob.glob(os.path.join('/tmp', "%s*%s" %
+ (xml_utils.TMPPFX,
xml_utils.TMPSFX)
+ )):
+ os.unlink(filename)
+
+ def canonicalize_test_xml(self):
+ et = ElementTree.parse(self.XMLFILE)
+ et.write(self.XMLFILE, encoding="UTF-8")
+ f = file(self.XMLFILE)
+ self.XMLSTR = f.read()
+ f.close()
+
+class test_ElementTree(xml_test_data):
def test_bundled_elementtree(self):
self.assertEqual(xml_utils.ElementTree.VERSION, ElementTree.VERSION)
+class test_TempXMLFile(xml_test_data):
+
+ def test_prefix_sufix(self):
+ filename = os.path.basename(self.XMLFILE)
+ self.assert_(filename.startswith(xml_utils.TMPPFX))
+ self.assert_(filename.endswith(xml_utils.TMPSFX))
+
+ def test_test_TempXMLFile_canread(self):
+ tmpf = xml_utils.TempXMLFile()
+ tmpf.write(self.XMLSTR)
+ tmpf.seek(0)
+ stuff = tmpf.read()
+ self.assertEqual(stuff, self.XMLSTR)
+ del tmpf
+
+ def test_TempXMLFile_implicit(self):
+ def out_of_scope_tempxmlfile():
+ tmpf = xml_utils.TempXMLFile()
+ return tmpf.name
+ self.assertRaises(OSError, os.stat, out_of_scope_tempxmlfile())
+
+
+ def test_TempXMLFile_explicit(self):
+ tmpf = xml_utils.TempXMLFile()
+ tmpf_name = tmpf.name
+ # Assert this does NOT raise an exception
+ os.stat(tmpf_name)
+ del tmpf
+ self.assertRaises(OSError, os.stat, tmpf_name)
+
+
+class test_XMLBackup(xml_test_data):
+
+ class_to_test = xml_utils.XMLBackup
+
+ def is_same_contents(self, filename):
+ f = file(filename, "rb")
+ s = f.read()
+ return s == self.XMLSTR
+
+ def test_backup_filename(self):
+ xmlbackup = self.class_to_test(self.XMLFILE)
+ self.assertEqual(xmlbackup.sourcefilename, self.XMLFILE)
+
+ def test_backup_file(self):
+ xmlbackup = self.class_to_test(self.XMLFILE)
+ self.assertTrue(self.is_same_contents(xmlbackup.name))
+
+ def test_rebackup_file(self):
+ xmlbackup = self.class_to_test(self.XMLFILE)
+ oops = file(xmlbackup.name, "wb")
+ oops.write("foobar")
+ oops.close()
+ self.assertFalse(self.is_same_contents(xmlbackup.name))
+ xmlbackup.backup()
+ self.assertTrue(self.is_same_contents(xmlbackup.name))
+
+ def test_restore_file(self):
+ xmlbackup = self.class_to_test(self.XMLFILE)
+ # nuke source
+ os.unlink(xmlbackup.sourcefilename)
+ xmlbackup.restore()
+ self.assertTrue(self.is_same_contents(xmlbackup.name))
+
+ def test_remove_backup_file(self):
+ xmlbackup = self.class_to_test(self.XMLFILE)
+ filename = xmlbackup.name
+ os.unlink(filename)
+ del xmlbackup
+ self.assertRaises(OSError, os.unlink, filename)
+
+ def test_TempXMLBackup_implicit(self):
+ def out_of_scope_xmlbackup():
+ tmpf = self.class_to_test(self.XMLFILE)
+ return tmpf.name
+ filename = out_of_scope_xmlbackup()
+ # DOES NOT delete
+ self.assertTrue(self.is_same_contents(filename))
+ os.unlink(filename)
+
+ def test_TempXMLBackup_exception_exit(self):
+ tmpf = self.class_to_test(self.XMLFILE)
+ filename = tmpf.name
+ # simulate exception exit DOES NOT DELETE
+ tmpf.__exit__(Exception, "foo", "bar")
+ self.assertTrue(self.is_same_contents(filename))
+ os.unlink(filename)
+
+ def test_TempXMLBackup_unexception_exit(self):
+ tmpf = self.class_to_test(self.XMLFILE)
+ filename = tmpf.name
+ # simulate normal exit DOES DELETE
+ tmpf.__exit__(None, None, None)
+ self.assertRaises(OSError, os.unlink, filename)
+
+
+class test_XMLBase(test_XMLBackup):
+
+ class_to_test = xml_utils.XMLBase
+
+ def test_init_str(self):
+ xml = self.class_to_test(self.XMLSTR)
+ self.assert_(xml.tempsource is not None)
+
+ def test_init_xml(self):
+ xml = self.class_to_test(self.XMLFILE)
+ self.assert_(xml.tempsource is None)
+
+ def test_restore_file(self):
+ # String source generates a restorable temporary file
+ xmlbackup = self.class_to_test(self.XMLSTR)
+ os.unlink(xmlbackup.sourcefilename)
+ xmlbackup.restore()
+ self.assertTrue(self.is_same_contents(xmlbackup.tempsource.name))
+
+ def test_restore_fails(self):
+ xmlbackup = self.class_to_test(self.XMLFILE)
+ self.assertRaises(IOError, xmlbackup.restore)
+
+ def test_write_default(self):
+ xmlbackup = self.class_to_test(self.XMLFILE)
+ wordsize =
xmlbackup.getroot().find('guest').find('arch').find('wordsize')
+ self.assertTrue(wordsize is not None)
+ self.assertEqual(int(wordsize.text), 32)
+ wordsize.text = str(64)
+ xmlbackup.write()
+ self.assertFalse(self.is_same_contents(xmlbackup.name))
+
+ def test_write_other(self):
+ xmlbackup = self.class_to_test(self.XMLFILE)
+ otherfile = xml_utils.TempXMLFile()
+ xmlbackup.write(otherfile)
+ otherfile.close()
+ self.assertTrue(self.is_same_contents(otherfile.name))
+
+ def test_write_other_changed(self):
+ xmlbackup = self.class_to_test(self.XMLSTR)
+ otherfile = xml_utils.TempXMLFile()
+ wordsize =
xmlbackup.getroot().find('guest').find('arch').find('wordsize')
+ wordsize.text = str(64)
+ xmlbackup.write(otherfile)
+ otherfile.close()
+ xmlbackup.write(self.XMLFILE)
+ xmlbackup.close()
+ self.canonicalize_test_xml()
+ self.assertTrue(self.is_same_contents(otherfile.name))
+
+ def test_read_other_changed(self):
+ xmlbackup = self.class_to_test(self.XMLSTR)
+ wordsize =
xmlbackup.getroot().find('guest').find('arch').find('wordsize')
+ wordsize.text = str(64)
+ otherfile = xml_utils.TempXMLFile()
+ xmlbackup.write(otherfile)
+ otherfile.close()
+ otherfile.close()
+ xmlbackup.backup()
+ self.assertTrue(self.is_same_contents(xmlbackup.name))
+ xmlbackup.read(otherfile.name)
+ self.assertFalse(self.is_same_contents(otherfile.name))
+ xmlbackup.write(self.XMLFILE)
+ self.assertFalse(self.is_same_contents(otherfile.name))
+ self.canonicalize_test_xml()
+ self.assertTrue(self.is_same_contents(otherfile.name))
if __name__ == "__main__":
unittest.main()
--
1.7.1
--
Best Regards
Yu Mingfei
_______________________________________________
Autotest-kernel mailing list
[email protected]
https://www.redhat.com/mailman/listinfo/autotest-kernel