This is Chris's patch about xml_utils*.
I put it here just for completeness.^_^
Because I cannot get it in master branch as now.

*PLEASE DO NOT COMMENT*

Signed-off-by: Yu Mingfei <[email protected]>
---
 client/shared/xml_utils.py          |  147 ++++++++++++++++++++++++
 client/shared/xml_utils_unittest.py |  213 ++++++++++++++++++++++++++++++++++-
 2 files changed, 358 insertions(+), 2 deletions(-)

diff --git a/client/shared/xml_utils.py b/client/shared/xml_utils.py
index d4bae45..cdfef25 100644
--- a/client/shared/xml_utils.py
+++ b/client/shared/xml_utils.py
@@ -3,9 +3,156 @@
     in python 2.4 systems.
 """
+import os.path, shutil, tempfile
+
 try:
     import autotest.common as common
 except ImportError:
     import common
+import logging
+
 from autotest.client.shared import ElementTree
+
+# Used by unittests
+TMPPFX='xml_utils_temp_'
+TMPSFX='.xml'
+
+class TempXMLFile(file):
+    """
+    Temporary XML file removed on instance deletion / unexceptional module 
exit.
+    """
+
+    def __init__(self, suffix=TMPSFX, prefix=TMPPFX, mode="wb+", buffer=1):
+        """
+        Initialize temporary XML file removed on instance destruction.
+
+        param: suffix: temporary file's suffix
+        param: prefix: temporary file's prefix
+        param: mode: file access mode
+        param: buffer: size of buffer in bytes, 1: line buffered
+        """
+        fd,path = tempfile.mkstemp(suffix=suffix, prefix=prefix)
+        os.close(fd)
+        super(TempXMLFile, self).__init__(path, mode, buffer)
+
+    def __exit__(self, exc_type, exc_value, traceback):
+        """
+        Always remove temporary file on module exit.
+        """
+        self.__del__()
+        super(TempXMLFile, self).__exit__(exc_type, exc_value, traceback)
+
+    def __del__(self):
+        """
+        Remove temporary file on instance delete.
+        """
+        try:
+            os.unlink(self.name)
+        except OSError:
+            pass # don't care
+
+class XMLBackup(TempXMLFile):
+    """Temporary XML backuap, removed on unexceptional destruction."""
+
+    sourcefilename = None
+
+    def __init__(self, sourcefilename):
+        """
+        Initialize a temporary backup from sourcefilename.
+        """
+        super(XMLBackup, self).__init__()
+        self.sourcefilename = sourcefilename
+        self.backup()
+
+    def backup(self):
+        """
+        Overwrite temporary backup with contents of original source.
+        """
+        self.flush()
+        self.seek(0)
+        shutil.copyfileobj(file(self.sourcefilename, "rb"), 
super(XMLBackup,self))
+        self.seek(0)
+
+    def restore(self):
+        """
+        Overwrite original source with contents of temporary backup
+        """
+        self.flush()
+        self.seek(0)
+        shutil.copyfileobj(super(XMLBackup,self), file(self.sourcefilename, 
"wb+"))
+        self.seek(0)
+
+    def _info(self):
+        logging.info("Retaining backup of %s in %s", self.sourcefilename,
+                                                     self.name)
+
+    def __exit__(self, exc_type, exc_value, traceback):
+        """
+        Remove temporary backup on unexceptional module exit.
+        """
+        if exc_type is None and exc_value is None and traceback is None:
+            super(XMLBackup, self).__del__()
+        else:
+            self._info()
+
+    def __del__(self):
+        """
+        Remove temporary file on instance delete.
+        """
+        self._info()
+
+class XMLBase(ElementTree.ElementTree, XMLBackup):
+    """ElementTree backed by a file copy of source"""
+
+    # Automaticaly remove temp file instance destruction
+    tempsource = None
+
+    def __init__(self, xml):
+        """
+        Initialize from a string or filename containing XML source.
+
+        param: xml: A filename or string containing XML
+        """
+        # xml param could be xml string or readable filename
+        if not self.readablefile(xml):
+            self.tempsource = TempXMLFile()
+            self.tempsource.write(xml)
+            # Prevent source modification
+            self.tempsource.close()
+            xml = self.tempsource.name
+        # xml guaranteed to be a filename
+        XMLBackup.__init__(self, sourcefilename=xml)
+        ElementTree.ElementTree.__init__(self,element=None, file=xml)
+
+    @classmethod
+    def readablefile(cls, filename):
+        """
+        Returns True/False if filename exists and is readable
+        """
+        try:
+            test = os.stat(filename)
+        except (OSError, IOError):
+            return False
+        return True
+
+    def restore(self):
+        """
+        Restore if instance initialized from string, otherwise raise IOError.
+        """
+        if self.tempsource:
+            super(XMLBase, self).restore()
+        else:
+            raise IOError, "Can't overwrite %s" % self.sourcefilename
+
+    def write(self, filename=None, encoding="UTF-8"):
+        """
+        Write current XML tree to filename, or self.name if None.
+        """
+        if filename is None:
+            filename = self.name
+        ElementTree.ElementTree.write(self, filename, encoding)
+
+    def read(self, xml):
+        self.__del__()
+        self.__init__(xml)
diff --git a/client/shared/xml_utils_unittest.py 
b/client/shared/xml_utils_unittest.py
index 6faf434..3af9161 100755
--- a/client/shared/xml_utils_unittest.py
+++ b/client/shared/xml_utils_unittest.py
@@ -1,6 +1,6 @@
 #!/usr/bin/python
-import unittest
+import unittest, tempfile, os, glob
try:
     import autotest.common as common
@@ -10,11 +10,220 @@ except ImportError:
 from autotest.client.shared import xml_utils, ElementTree
-class test_xml_utils(unittest.TestCase):
+class xml_test_data(unittest.TestCase):
+
+    def setUp(self):
+        # Compacted to save excess scrolling
+        self.TEXT_REPLACE_KEY="TEST_XML_TEXT_REPLACE"
+        self.XMLSTR="""<?xml version='1.0' 
encoding='UTF-8'?><capabilities><host>
+        
<uuid>4d515db1-9adc-477d-8195-f817681e72e6</uuid><cpu><arch>x86_64</arch>
+        <model>Westmere</model><vendor>Intel</vendor><topology sockets='1'
+        cores='2' threads='2'/><feature name='rdtscp'/><feature name='x2apic'/>
+        <feature name='xtpr'/><feature name='tm2'/><feature name='est'/>
+        <feature name='vmx'/><feature name='ds_cpl'/><feature name='monitor'/>
+        <feature name='pbe'/><feature name='tm'/><feature name='ht'/><feature
+        name='ss'/><feature name='acpi'/><feature name='ds'/><feature
+        name='vme'/></cpu><migration_features><live/><uri_transports>
+        <uri_transport>tcp</uri_transport></uri_transports>
+        </migration_features><topology><cells num='1'><cell id='0'><cpus
+        num='4'><cpu id='0'/><cpu id='1'/><cpu id='2'/><cpu id='3'/></cpus>
+        </cell></cells></topology><secmodel><model>selinux</model><doi>0</doi>
+        </secmodel></host><guest><os_type>hvm</os_type><arch name='i686'>
+        <wordsize>32</wordsize><emulator>TEST_XML_TEXT_REPLACE</emulator>
+        <machine>rhel6.2.0</machine><machine canonical='rhel6.2.0'>pc</machine>
+        <machine>rhel6.1.0</machine><machine>rhel6.0.0</machine><machine>
+        rhel5.5.0</machine><machine>rhel5.4.4</machine><machine>rhel5.4.0
+        </machine><domain type='qemu'></domain><domain type='kvm'><emulator>
+        /usr/libexec/qemu-kvm</emulator></domain></arch><features><cpuselection
+        /><deviceboot/><pae/><nonpae/><acpi default='on' toggle='yes'/><apic
+        default='on' toggle='no'/></features></guest></capabilities>"""
+        (fd, self.XMLFILE) = tempfile.mkstemp(suffix=xml_utils.TMPSFX,
+                                              prefix=xml_utils.TMPPFX)
+        os.write(fd, self.XMLSTR)
+        os.close(fd)
+        self.canonicalize_test_xml()
+
+    def tearDown(self):
+        for filename in glob.glob(os.path.join('/tmp', "%s*%s" %
+                                               (xml_utils.TMPPFX, 
xml_utils.TMPSFX)
+                                              )):
+            os.unlink(filename)
+
+    def canonicalize_test_xml(self):
+        et = ElementTree.parse(self.XMLFILE)
+        et.write(self.XMLFILE, encoding="UTF-8")
+        f = file(self.XMLFILE)
+        self.XMLSTR = f.read()
+        f.close()
+
+class test_ElementTree(xml_test_data):
def test_bundled_elementtree(self):
         self.assertEqual(xml_utils.ElementTree.VERSION, ElementTree.VERSION)
+class test_TempXMLFile(xml_test_data):
+
+    def test_prefix_sufix(self):
+        filename = os.path.basename(self.XMLFILE)
+        self.assert_(filename.startswith(xml_utils.TMPPFX))
+        self.assert_(filename.endswith(xml_utils.TMPSFX))
+
+    def test_test_TempXMLFile_canread(self):
+        tmpf = xml_utils.TempXMLFile()
+        tmpf.write(self.XMLSTR)
+        tmpf.seek(0)
+        stuff = tmpf.read()
+        self.assertEqual(stuff, self.XMLSTR)
+        del tmpf
+
+    def test_TempXMLFile_implicit(self):
+        def out_of_scope_tempxmlfile():
+            tmpf = xml_utils.TempXMLFile()
+            return tmpf.name
+        self.assertRaises(OSError, os.stat, out_of_scope_tempxmlfile())
+
+
+    def test_TempXMLFile_explicit(self):
+        tmpf = xml_utils.TempXMLFile()
+        tmpf_name = tmpf.name
+        # Assert this does NOT raise an exception
+        os.stat(tmpf_name)
+        del tmpf
+        self.assertRaises(OSError, os.stat, tmpf_name)
+
+
+class test_XMLBackup(xml_test_data):
+
+    class_to_test = xml_utils.XMLBackup
+
+    def is_same_contents(self, filename):
+        f = file(filename, "rb")
+        s = f.read()
+        return s == self.XMLSTR
+
+    def test_backup_filename(self):
+        xmlbackup = self.class_to_test(self.XMLFILE)
+        self.assertEqual(xmlbackup.sourcefilename, self.XMLFILE)
+
+    def test_backup_file(self):
+        xmlbackup = self.class_to_test(self.XMLFILE)
+        self.assertTrue(self.is_same_contents(xmlbackup.name))
+
+    def test_rebackup_file(self):
+        xmlbackup = self.class_to_test(self.XMLFILE)
+        oops = file(xmlbackup.name, "wb")
+        oops.write("foobar")
+        oops.close()
+        self.assertFalse(self.is_same_contents(xmlbackup.name))
+        xmlbackup.backup()
+        self.assertTrue(self.is_same_contents(xmlbackup.name))
+
+    def test_restore_file(self):
+        xmlbackup = self.class_to_test(self.XMLFILE)
+        # nuke source
+        os.unlink(xmlbackup.sourcefilename)
+        xmlbackup.restore()
+        self.assertTrue(self.is_same_contents(xmlbackup.name))
+
+    def test_remove_backup_file(self):
+        xmlbackup = self.class_to_test(self.XMLFILE)
+        filename = xmlbackup.name
+        os.unlink(filename)
+        del xmlbackup
+        self.assertRaises(OSError, os.unlink, filename)
+
+    def test_TempXMLBackup_implicit(self):
+        def out_of_scope_xmlbackup():
+            tmpf = self.class_to_test(self.XMLFILE)
+            return tmpf.name
+        filename = out_of_scope_xmlbackup()
+        #  DOES NOT delete
+        self.assertTrue(self.is_same_contents(filename))
+        os.unlink(filename)
+
+    def test_TempXMLBackup_exception_exit(self):
+        tmpf = self.class_to_test(self.XMLFILE)
+        filename = tmpf.name
+        # simulate exception exit DOES NOT DELETE
+        tmpf.__exit__(Exception, "foo", "bar")
+        self.assertTrue(self.is_same_contents(filename))
+        os.unlink(filename)
+
+    def test_TempXMLBackup_unexception_exit(self):
+        tmpf = self.class_to_test(self.XMLFILE)
+        filename = tmpf.name
+        # simulate normal exit DOES DELETE
+        tmpf.__exit__(None, None, None)
+        self.assertRaises(OSError, os.unlink, filename)
+
+
+class test_XMLBase(test_XMLBackup):
+
+    class_to_test = xml_utils.XMLBase
+
+    def test_init_str(self):
+        xml = self.class_to_test(self.XMLSTR)
+        self.assert_(xml.tempsource is not None)
+
+    def test_init_xml(self):
+        xml = self.class_to_test(self.XMLFILE)
+        self.assert_(xml.tempsource is None)
+
+    def test_restore_file(self):
+        # String source generates a restorable temporary file
+        xmlbackup = self.class_to_test(self.XMLSTR)
+        os.unlink(xmlbackup.sourcefilename)
+        xmlbackup.restore()
+        self.assertTrue(self.is_same_contents(xmlbackup.tempsource.name))
+
+    def test_restore_fails(self):
+        xmlbackup = self.class_to_test(self.XMLFILE)
+        self.assertRaises(IOError, xmlbackup.restore)
+
+    def test_write_default(self):
+        xmlbackup = self.class_to_test(self.XMLFILE)
+        wordsize = 
xmlbackup.getroot().find('guest').find('arch').find('wordsize')
+        self.assertTrue(wordsize is not None)
+        self.assertEqual(int(wordsize.text), 32)
+        wordsize.text = str(64)
+        xmlbackup.write()
+        self.assertFalse(self.is_same_contents(xmlbackup.name))
+
+    def test_write_other(self):
+        xmlbackup = self.class_to_test(self.XMLFILE)
+        otherfile = xml_utils.TempXMLFile()
+        xmlbackup.write(otherfile)
+        otherfile.close()
+        self.assertTrue(self.is_same_contents(otherfile.name))
+
+    def test_write_other_changed(self):
+        xmlbackup = self.class_to_test(self.XMLSTR)
+        otherfile = xml_utils.TempXMLFile()
+        wordsize = 
xmlbackup.getroot().find('guest').find('arch').find('wordsize')
+        wordsize.text = str(64)
+        xmlbackup.write(otherfile)
+        otherfile.close()
+        xmlbackup.write(self.XMLFILE)
+        xmlbackup.close()
+        self.canonicalize_test_xml()
+        self.assertTrue(self.is_same_contents(otherfile.name))
+
+    def test_read_other_changed(self):
+        xmlbackup = self.class_to_test(self.XMLSTR)
+        wordsize = 
xmlbackup.getroot().find('guest').find('arch').find('wordsize')
+        wordsize.text = str(64)
+        otherfile = xml_utils.TempXMLFile()
+        xmlbackup.write(otherfile)
+        otherfile.close()
+        otherfile.close()
+        xmlbackup.backup()
+        self.assertTrue(self.is_same_contents(xmlbackup.name))
+        xmlbackup.read(otherfile.name)
+        self.assertFalse(self.is_same_contents(otherfile.name))
+        xmlbackup.write(self.XMLFILE)
+        self.assertFalse(self.is_same_contents(otherfile.name))
+        self.canonicalize_test_xml()
+        self.assertTrue(self.is_same_contents(otherfile.name))
if __name__ == "__main__":
     unittest.main()
--
1.7.1


--
Best Regards
Yu Mingfei

_______________________________________________
Autotest-kernel mailing list
[email protected]
https://www.redhat.com/mailman/listinfo/autotest-kernel

Reply via email to