Added a directory contents based source for "mirror" and updated unittests.
Signed-off-by: Mihai Rusu <[email protected]> --- autotest/mirror/source.py 2010-05-10 11:32:46.000000000 -0700 +++ autotest/mirror/source.py 2010-05-10 11:32:46.000000000 -0700 @@ -1,6 +1,6 @@ # Copyright 2009 Google Inc. Released under the GPL v2 -import re, time, urllib2, urlparse, HTMLParser +import os, re, time, urllib2, urlparse, HTMLParser from autotest_lib.mirror import database from autotest_lib.client.common_lib import utils @@ -183,3 +183,46 @@ files[item.name] = item return self._get_new_files(files) + + +class directory_source(source): + """ + Source that finds kernel files by listing the contents of a directory. + """ + def __init__(self, database, path): + """ + Initialize a directory_source instance. + + @param database: Persistent database with known kernels information. + @param path: Path to the directory with the kernel files found by + this source. + """ + super(directory_source, self).__init__(database) + + self._path = path + + + def get_new_files(self, _stat_func=os.stat): + """ + Main function, see source.get_new_files(). + + @param _stat_func: Used for unit testing, if we stub os.stat in the + unit test then unit test failures get reported confusingly + because the unit test framework tries to stat() the unit test + file. + """ + all_files = {} + for filename in os.listdir(self._path): + full_filename = os.path.join(self._path, filename) + try: + stat_data = _stat_func(full_filename) + except OSError: + # File might have been removed/renamed since we listed the + # directory so skip it. + continue + + item = database.item(full_filename, stat_data.st_size, + int(stat_data.st_mtime)) + all_files[filename] = item + + return self._get_new_files(all_files) --- autotest/mirror/source_unittest.py 2010-05-10 11:32:46.000000000 -0700 +++ autotest/mirror/source_unittest.py 2010-05-10 11:32:46.000000000 -0700 @@ -8,7 +8,28 @@ from autotest_lib.client.common_lib.test_utils import mock -class rsync_source_unittest(unittest.TestCase): +class common_source(unittest.TestCase): + """ + Common support class for source unit tests. + """ + def setUp(self): + self.god = mock.mock_god() + self.db_mock = self.god.create_mock_class( + source.database.database, 'database') + + # Set fixed timezone so parsing time values does not break. + self._old_tz = getattr(os.environ, 'TZ', '') + os.environ['TZ'] = 'America/Los_Angeles' + time.tzset() + + + def tearDown(self): + self.god.unstub_all() + os.environ['TZ'] = self._old_tz + time.tzset() + + +class rsync_source_unittest(common_source): _cmd_template = '/usr/bin/rsync -rltz --no-motd %s %s/%s' _prefix = 'rsync://rsync.kernel.org/pub/linux/kernel' _path1 = 'v2.6/patch-2.6.*.bz2' @@ -61,19 +82,9 @@ } def setUp(self): - self.god = mock.mock_god() - self.db_mock = self.god.create_mock_class( - source.database.database, 'database') - self.god.stub_function(source.utils, 'system_output') - self.old_tz = getattr(os.environ, 'TZ', '') - os.environ['TZ'] = 'America/Los_Angeles' - time.tzset() + super(rsync_source_unittest, self).setUp() - - def tearDown(self): - self.god.unstub_all() - os.environ['TZ'] = self.old_tz - time.tzset() + self.god.stub_function(source.utils, 'system_output') def test_simple(self): @@ -118,7 +129,7 @@ self.god.check_playback() -class url_source_unittest(unittest.TestCase): +class url_source_unittest(common_source): _prefix = 'http://www.kernel.org/pub/linux/kernel/' _path1 = 'v2.6/' @@ -221,23 +232,13 @@ } def setUp(self): - self.god = mock.mock_god() - self.db_mock = self.god.create_mock_class( - source.database.database, 'database') + super(url_source_unittest, self).setUp() + self.god.stub_function(source.urllib2, 'urlopen') self.addinfourl_mock = self.god.create_mock_class( source.urllib2.addinfourl, 'addinfourl') self.mime_mock = self.god.create_mock_class( httplib.HTTPMessage, 'HTTPMessage') - self.old_tz = getattr(os.environ, 'TZ', '') - os.environ['TZ'] = 'America/Los_Angeles' - time.tzset() - - - def tearDown(self): - self.god.unstub_all() - os.environ['TZ'] = self.old_tz - time.tzset() def test_get_new_files(self): @@ -270,5 +271,95 @@ self.god.check_playback() +class directory_source_unittest(common_source): + """ + Unit test class for directory_source. + """ + def setUp(self): + super(directory_source_unittest, self).setUp() + + self.god.stub_function(os, 'listdir') + self._stat_mock = self.god.create_mock_function('stat') + + + @staticmethod + def _get_stat_result(mode=0644, ino=12345, dev=12345, nlink=1, uid=1000, + gid=1000, size=10, atime=123, mtime=123, ctime=123): + """ + Build an os.stat_result() instance with many default values. + + @param mode, ino, dev, nlink, uid, gid, size, atime, mtime, ctime: + See help(os.stat_result). + """ + return os.stat_result((mode, ino, dev, nlink, uid, gid, size, atime, + mtime, ctime)) + + + def test_get_new_files_invalid_path(self): + """ + Test directory_source.get_new_files() on an invalid path. + """ + path = '/some/invalid/path' + os.listdir.expect_call(path).and_raises(OSError('Error')) + + s = source.directory_source(self.db_mock, path) + self.assertRaises(OSError, s.get_new_files) + self.god.check_playback() + + + def test_get_new_files_stat_fails(self): + """ + Test directory_source.get_new_files() when stat fails. + """ + path = '/some/valid/path' + os.listdir.expect_call(path).and_return(['file1', 'file2']) + (self._stat_mock.expect_call(os.path.join(path, 'file1')) + .and_raises(OSError('Error'))) + stat_result = self._get_stat_result(size=1010, mtime=123) + file2_full_path = os.path.join(path, 'file2') + self._stat_mock.expect_call(file2_full_path).and_return(stat_result) + + self.db_mock.get_dictionary.expect_call().and_return({}) + + s = source.directory_source(self.db_mock, path) + expected = {'file2': source.database.item(file2_full_path, 1010, 123)} + self.assertEquals(expected, s.get_new_files(_stat_func=self._stat_mock)) + self.god.check_playback() + + + def test_get_new_files_success(self): + """ + Test directory_source.get_new_files() success. + """ + path = '/some/valid/path' + file1_full_path = os.path.join(path, 'file1') + file2_full_path = os.path.join(path, 'file2') + file3_full_path = os.path.join(path, 'file3') + + os.listdir.expect_call(path).and_return(['file1', 'file2', 'file3']) + + stat_result = self._get_stat_result(size=1010, mtime=123) + self._stat_mock.expect_call(file1_full_path).and_return(stat_result) + + stat_result = self._get_stat_result(size=1020, mtime=1234) + self._stat_mock.expect_call(file2_full_path).and_return(stat_result) + + stat_result = self._get_stat_result(size=1030, mtime=12345) + self._stat_mock.expect_call(file3_full_path).and_return(stat_result) + + known_files = { + 'file2': source.database.item(file2_full_path, 1020, 1234), + } + self.db_mock.get_dictionary.expect_call().and_return(known_files) + + s = source.directory_source(self.db_mock, path) + expected = { + 'file1': source.database.item(file1_full_path, 1010, 123), + 'file3': source.database.item(file3_full_path, 1030, 12345), + } + self.assertEquals(expected, s.get_new_files(_stat_func=self._stat_mock)) + self.god.check_playback() + + if __name__ == "__main__": unittest.main() _______________________________________________ Autotest mailing list [email protected] http://test.kernel.org/cgi-bin/mailman/listinfo/autotest
