Added a directory contents based source for "mirror" and updated 
unittests.

Signed-off-by: Mihai Rusu <[email protected]>

--- autotest/mirror/source.py   2010-05-10 11:32:46.000000000 -0700
+++ autotest/mirror/source.py   2010-05-10 11:32:46.000000000 -0700
@@ -1,6 +1,6 @@
 # Copyright 2009 Google Inc. Released under the GPL v2
 
-import re, time, urllib2, urlparse, HTMLParser
+import os, re, time, urllib2, urlparse, HTMLParser
 
 from autotest_lib.mirror import database
 from autotest_lib.client.common_lib import utils
@@ -183,3 +183,46 @@
                     files[item.name] = item
 
         return self._get_new_files(files)
+
+
+class directory_source(source):
+    """
+    Source that finds kernel files by listing the contents of a directory.
+    """
+    def __init__(self, database, path):
+        """
+        Initialize a directory_source instance.
+
+        @param database: Persistent database with known kernels information.
+        @param path: Path to the directory with the kernel files found by
+                this source.
+        """
+        super(directory_source, self).__init__(database)
+
+        self._path = path
+
+
+    def get_new_files(self, _stat_func=os.stat):
+        """
+        Main function, see source.get_new_files().
+
+        @param _stat_func: Used for unit testing, if we stub os.stat in the
+                unit test then unit test failures get reported confusingly
+                because the unit test framework tries to stat() the unit test
+                file.
+        """
+        all_files = {}
+        for filename in os.listdir(self._path):
+            full_filename = os.path.join(self._path, filename)
+            try:
+                stat_data = _stat_func(full_filename)
+            except OSError:
+                # File might have been removed/renamed since we listed the
+                # directory so skip it.
+                continue
+
+            item = database.item(full_filename, stat_data.st_size,
+                                 int(stat_data.st_mtime))
+            all_files[filename] = item
+
+        return self._get_new_files(all_files)
--- autotest/mirror/source_unittest.py  2010-05-10 11:32:46.000000000 -0700
+++ autotest/mirror/source_unittest.py  2010-05-10 11:32:46.000000000 -0700
@@ -8,7 +8,28 @@
 from autotest_lib.client.common_lib.test_utils import mock
 
 
-class rsync_source_unittest(unittest.TestCase):
+class common_source(unittest.TestCase):
+    """
+    Common support class for source unit tests.
+    """
+    def setUp(self):
+        self.god = mock.mock_god()
+        self.db_mock = self.god.create_mock_class(
+                source.database.database, 'database')
+
+        # Set fixed timezone so parsing time values does not break.
+        self._old_tz = getattr(os.environ, 'TZ', '')
+        os.environ['TZ'] = 'America/Los_Angeles'
+        time.tzset()
+
+
+    def tearDown(self):
+        self.god.unstub_all()
+        os.environ['TZ'] = self._old_tz
+        time.tzset()
+
+
+class rsync_source_unittest(common_source):
     _cmd_template = '/usr/bin/rsync -rltz --no-motd %s %s/%s'
     _prefix = 'rsync://rsync.kernel.org/pub/linux/kernel'
     _path1 = 'v2.6/patch-2.6.*.bz2'
@@ -61,19 +82,9 @@
         }
 
     def setUp(self):
-        self.god = mock.mock_god()
-        self.db_mock = self.god.create_mock_class(
-            source.database.database, 'database')
-        self.god.stub_function(source.utils, 'system_output')
-        self.old_tz = getattr(os.environ, 'TZ', '')
-        os.environ['TZ'] = 'America/Los_Angeles'
-        time.tzset()
+        super(rsync_source_unittest, self).setUp()
 
-
-    def tearDown(self):
-        self.god.unstub_all()
-        os.environ['TZ'] = self.old_tz
-        time.tzset()
+        self.god.stub_function(source.utils, 'system_output')
 
 
     def test_simple(self):
@@ -118,7 +129,7 @@
         self.god.check_playback()
 
 
-class url_source_unittest(unittest.TestCase):
+class url_source_unittest(common_source):
     _prefix = 'http://www.kernel.org/pub/linux/kernel/'
 
     _path1 = 'v2.6/'
@@ -221,23 +232,13 @@
         }
 
     def setUp(self):
-        self.god = mock.mock_god()
-        self.db_mock = self.god.create_mock_class(
-            source.database.database, 'database')
+        super(url_source_unittest, self).setUp()
+
         self.god.stub_function(source.urllib2, 'urlopen')
         self.addinfourl_mock = self.god.create_mock_class(
             source.urllib2.addinfourl, 'addinfourl')
         self.mime_mock = self.god.create_mock_class(
             httplib.HTTPMessage, 'HTTPMessage')
-        self.old_tz = getattr(os.environ, 'TZ', '')
-        os.environ['TZ'] = 'America/Los_Angeles'
-        time.tzset()
-
-
-    def tearDown(self):
-        self.god.unstub_all()
-        os.environ['TZ'] = self.old_tz
-        time.tzset()
 
 
     def test_get_new_files(self):
@@ -270,5 +271,95 @@
         self.god.check_playback()
 
 
+class directory_source_unittest(common_source):
+    """
+    Unit test class for directory_source.
+    """
+    def setUp(self):
+        super(directory_source_unittest, self).setUp()
+
+        self.god.stub_function(os, 'listdir')
+        self._stat_mock = self.god.create_mock_function('stat')
+
+
+    @staticmethod
+    def _get_stat_result(mode=0644, ino=12345, dev=12345, nlink=1, uid=1000,
+                         gid=1000, size=10, atime=123, mtime=123, ctime=123):
+        """
+        Build an os.stat_result() instance with many default values.
+
+        @param mode, ino, dev, nlink, uid, gid, size, atime, mtime, ctime:
+                See help(os.stat_result).
+        """
+        return os.stat_result((mode, ino, dev, nlink, uid, gid, size, atime,
+                               mtime, ctime))
+
+
+    def test_get_new_files_invalid_path(self):
+        """
+        Test directory_source.get_new_files() on an invalid path.
+        """
+        path = '/some/invalid/path'
+        os.listdir.expect_call(path).and_raises(OSError('Error'))
+
+        s = source.directory_source(self.db_mock, path)
+        self.assertRaises(OSError, s.get_new_files)
+        self.god.check_playback()
+
+
+    def test_get_new_files_stat_fails(self):
+        """
+        Test directory_source.get_new_files() when stat fails.
+        """
+        path = '/some/valid/path'
+        os.listdir.expect_call(path).and_return(['file1', 'file2'])
+        (self._stat_mock.expect_call(os.path.join(path, 'file1'))
+                .and_raises(OSError('Error')))
+        stat_result = self._get_stat_result(size=1010, mtime=123)
+        file2_full_path = os.path.join(path, 'file2')
+        self._stat_mock.expect_call(file2_full_path).and_return(stat_result)
+
+        self.db_mock.get_dictionary.expect_call().and_return({})
+
+        s = source.directory_source(self.db_mock, path)
+        expected = {'file2': source.database.item(file2_full_path, 1010, 123)}
+        self.assertEquals(expected, 
s.get_new_files(_stat_func=self._stat_mock))
+        self.god.check_playback()
+
+
+    def test_get_new_files_success(self):
+        """
+        Test directory_source.get_new_files() success.
+        """
+        path = '/some/valid/path'
+        file1_full_path = os.path.join(path, 'file1')
+        file2_full_path = os.path.join(path, 'file2')
+        file3_full_path = os.path.join(path, 'file3')
+
+        os.listdir.expect_call(path).and_return(['file1', 'file2', 'file3'])
+
+        stat_result = self._get_stat_result(size=1010, mtime=123)
+        self._stat_mock.expect_call(file1_full_path).and_return(stat_result)
+
+        stat_result = self._get_stat_result(size=1020, mtime=1234)
+        self._stat_mock.expect_call(file2_full_path).and_return(stat_result)
+
+        stat_result = self._get_stat_result(size=1030, mtime=12345)
+        self._stat_mock.expect_call(file3_full_path).and_return(stat_result)
+
+        known_files = {
+                'file2': source.database.item(file2_full_path, 1020, 1234),
+                }
+        self.db_mock.get_dictionary.expect_call().and_return(known_files)
+
+        s = source.directory_source(self.db_mock, path)
+        expected = {
+                'file1': source.database.item(file1_full_path, 1010, 123),
+                'file3': source.database.item(file3_full_path, 1030, 12345),
+                }
+        self.assertEquals(expected, 
s.get_new_files(_stat_func=self._stat_mock))
+        self.god.check_playback()
+
+
 if __name__ == "__main__":
     unittest.main()
_______________________________________________
Autotest mailing list
[email protected]
http://test.kernel.org/cgi-bin/mailman/listinfo/autotest

Reply via email to