Hello community,
here is the log from the commit of package obs-service-tar_scm for
openSUSE:Factory checked in at 2012-02-17 12:18:38
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Comparing /work/SRC/openSUSE:Factory/obs-service-tar_scm (Old)
and /work/SRC/openSUSE:Factory/.obs-service-tar_scm.new (New)
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Package is "obs-service-tar_scm", Maintainer is ""
Changes:
--------
--- /work/SRC/openSUSE:Factory/obs-service-tar_scm/obs-service-tar_scm.changes
2012-01-04 07:25:39.000000000 +0100
+++
/work/SRC/openSUSE:Factory/.obs-service-tar_scm.new/obs-service-tar_scm.changes
2012-02-17 12:18:39.000000000 +0100
@@ -1,0 +2,37 @@
+Thu Feb 16 15:23:35 GMT 2012 - [email protected]
+
+- When the cache is used, output location of repo in the cache
+
+-------------------------------------------------------------------
+Tue Feb 14 16:52:19 GMT 2012 - [email protected]
+
+- add new 'versionformat' option to determine how version is
+ extracted via git show --pretty=...
+- support caching of cloned repositories to speed up fetch
+ from upstream
+
+-------------------------------------------------------------------
+Mon Feb 13 15:52:19 GMT 2012 - [email protected]
+
+- Add test suite
+- Fix --subdir with --scm svn
+- Fix --scm bzr
+
+-------------------------------------------------------------------
+Mon Feb 13 10:51:19 UTC 2012 - [email protected]
+
+- patch license to follow spdx.org standard
+
+-------------------------------------------------------------------
+Tue Jan 24 15:46:17 UTC 2012 - [email protected]
+
+- add new option to specify a subset of files/subdirectories to
+ pack in the tar ball
+
+-------------------------------------------------------------------
+Tue Jan 24 13:26:19 UTC 2012 - [email protected]
+
+- Checking out a specific revision cannot work when only the latest
+ version is cloned.
+
+-------------------------------------------------------------------
New:
----
bzrfixtures.py
bzrtests.py
commontests.py
fixtures.py
gitfixtures.py
githgtests.py
gittests.py
hgfixtures.py
hgtests.py
scm-wrapper
scmlogs.py
svnfixtures.py
svntests.py
tar_scm.rc
test.py
testassertions.py
testenv.py
utils.py
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Other differences:
------------------
++++++ obs-service-tar_scm.spec ++++++
--- /var/tmp/diff_new_pack.G30PbZ/_old 2012-02-17 12:18:42.000000000 +0100
+++ /var/tmp/diff_new_pack.G30PbZ/_new 2012-02-17 12:18:42.000000000 +0100
@@ -1,7 +1,7 @@
#
# spec file for package obs-service-tar_scm
#
-# Copyright (c) 2011 SUSE LINUX Products GmbH, Nuernberg, Germany.
+# Copyright (c) 2012 SUSE LINUX Products GmbH, Nuernberg, Germany.
#
# All modifications and additions to the file contributed by third parties
# remain the property of their copyright owners, unless otherwise agreed
@@ -16,17 +16,44 @@
#
+%define service tar_scm
-Name: obs-service-tar_scm
-License: GPL v2 or later
-Group: Development/Tools/Building
+Name: obs-service-%{service}
Summary: An OBS source service: checkout or update a tar ball from
svn/git/hg
-Url:
https://build.opensuse.org/package/show?package=obs-service-tar_scm&project=openSUSE%3ATools
-Version: 0.2.1
-Release: 1
-Source: tar_scm
-Source1: tar_scm.service
-Requires: subversion git mercurial bzr
+License: GPL-2.0+
+Group: Development/Tools/Building
+Url:
https://build.opensuse.org/package/show?package=obs-service-%{service}&project=openSUSE%3ATools
+Version: 0.2.3
+Release: 0
+Source: %{service}
+Source1: %{service}.service
+Source2: %{service}.rc
+
+# test suite files
+Source100: bzrfixtures.py
+Source101: bzrtests.py
+Source102: commontests.py
+Source103: fixtures.py
+Source104: gitfixtures.py
+Source105: githgtests.py
+Source106: gittests.py
+Source107: hgfixtures.py
+Source108: hgtests.py
+Source109: scmlogs.py
+Source110: svnfixtures.py
+Source111: svntests.py
+Source112: testassertions.py
+Source113: testenv.py
+Source114: test.py
+Source115: utils.py
+Source116: scm-wrapper
+
+Requires: bzr git mercurial subversion
+BuildRequires: bzr
+BuildRequires: git
+BuildRequires: mercurial
+BuildRequires: python >= 2.6
+BuildRequires: subversion
BuildRoot: %{_tmppath}/%{name}-%{version}-build
BuildArch: noarch
@@ -46,9 +73,20 @@
install -m 0755 %{SOURCE0} $RPM_BUILD_ROOT/usr/lib/obs/service
install -m 0644 %{SOURCE1} $RPM_BUILD_ROOT/usr/lib/obs/service
+mkdir -p $RPM_BUILD_ROOT/etc/obs/services
+install -m 0644 %{SOURCE2} $RPM_BUILD_ROOT/etc/obs/services/%{service}
+
+%check
+chmod +x $RPM_SOURCE_DIR/scm-wrapper
+: Running the test suite. Please be patient - this takes a few minutes ...
+python $RPM_SOURCE_DIR/test.py
+
%files
%defattr(-,root,root)
%dir /usr/lib/obs
/usr/lib/obs/service
+%dir /etc/obs
+%dir /etc/obs/services
+%config(noreplace) /etc/obs/services/*
%changelog
++++++ bzrfixtures.py ++++++
#!/usr/bin/python
import os
from fixtures import Fixtures
from utils import mkfreshdir, run_bzr
class BzrFixtures(Fixtures):
def init(self):
self.create_repo()
self.create_commits(2)
def run(self, cmd):
return run_bzr(self.repo_path, cmd)
def create_repo(self):
os.makedirs(self.repo_path)
os.chdir(self.repo_path)
self.run('init')
self.run('whoami "%s"' % self.name_and_email)
self.wd = self.repo_path
print "created repo", self.repo_path
def do_commit(self, newly_created):
self.run('add .')
self.run('commit -m%d' % self.next_commit_rev)
def record_rev(self, rev_num):
self.revs[rev_num] = str(rev_num)
self.scmlogs.annotate("Recorded rev %d" % rev_num)
++++++ bzrtests.py ++++++
#!/usr/bin/python
from commontests import CommonTests
from bzrfixtures import BzrFixtures
from utils import run_bzr
class BzrTests(CommonTests):
scm = 'bzr'
initial_clone_command = 'bzr checkout'
update_cache_command = 'bzr update'
fixtures_class = BzrFixtures
def default_version(self):
return self.rev(2)
def test_versionformat_rev(self):
self.tar_scm_std('--versionformat', 'myrev%r.svn')
self.assertTarOnly(self.basename(version = 'myrev2.svn'))
def test_version_versionformat(self):
self.tar_scm_std('--version', '3.0', '--versionformat', 'myrev%r.svn')
self.assertTarOnly(self.basename(version = 'myrev2.svn'))
def test_versionformat_revision(self):
self.fixtures.create_commits(4)
self.tar_scm_std('--versionformat', 'foo%r', '--revision', self.rev(2))
basename = self.basename(version = 'foo2')
th = self.assertTarOnly(basename)
self.assertTarMemberContains(th, basename + '/a', '2')
++++++ commontests.py ++++++
#!/usr/bin/python
from pprint import pprint, pformat
from testassertions import TestAssertions
from testenv import TestEnvironment
from utils import mkfreshdir
class CommonTests(TestEnvironment, TestAssertions):
def basename(self, name='repo', version=None):
if version is None:
version = self.default_version()
return '%s-%s' % (name, version)
def test_plain(self):
self.tar_scm_std()
self.assertTarOnly(self.basename())
def test_exclude(self):
self.tar_scm_std('--exclude', '.' + self.scm)
self.assertTarOnly(self.basename())
def test_subdir(self):
self.tar_scm_std('--subdir', self.fixtures.subdir)
self.assertTarOnly(self.basename(), tarchecker=self.assertSubdirTar)
def test_history_depth_obsolete(self):
(stdout, stderr, ret) = self.tar_scm_std('--history-depth', '1')
self.assertRegexpMatches(stdout, 'obsolete')
# self.assertTarOnly(self.basename())
# self.assertRegexpMatches(self.scmlogs.read()[0], '^%s clone
--depth=1')
# def test_history_depth_full(self):
# self.tar_scm_std('--history-depth', 'full')
# self.assertTarOnly(self.basename())
# self.assertRegexpMatches(self.scmlogs.read()[0], '^git clone
--depth=999999+')
def test_filename(self):
name = 'myfilename'
self.tar_scm_std('--filename', name)
self.assertTarOnly(self.basename(name=name))
def test_version(self):
version = '0.5'
self.tar_scm_std('--version', version)
self.assertTarOnly(self.basename(version=version))
def test_filename_version(self):
filename = 'myfilename'
version = '0.6'
self.tar_scm_std('--filename', filename, '--version', version)
self.assertTarOnly(self.basename(filename, version))
def test_revision_nop(self):
self.tar_scm_std('--revision', self.rev(2))
th = self.assertTarOnly(self.basename())
self.assertTarMemberContains(th, self.basename() + '/a', '2')
def test_revision(self):
self._revision()
def test_revision_no_cache(self):
self._revision(use_cache=False)
def test_revision_subdir(self):
self._revision(use_subdir=True)
def test_revision_subdir_no_cache(self):
self._revision(use_cache=False, use_subdir=True)
def _revision(self, use_cache=True, use_subdir=False):
version = '3.0'
args_tag2 = [
'--version', version,
'--revision', self.rev(2),
]
if use_subdir:
args_tag2 += [ '--subdir', self.fixtures.subdir ]
self._sequential_calls_with_revision(
version,
[
(0, args_tag2, '2', False),
(0, args_tag2, '2', use_cache),
(2, args_tag2, '2', use_cache),
(0, args_tag2, '2', use_cache),
(2, args_tag2, '2', use_cache),
(0, args_tag2, '2', use_cache),
],
use_cache
)
def test_revision_master_alternating(self):
self._revision_master_alternating()
def test_revision_master_alternating_no_cache(self):
self._revision_master_alternating(use_cache=False)
def test_revision_master_alternating_subdir(self):
self._revision_master_alternating(use_subdir=True)
def test_revision_master_alternating_subdir_no_cache(self):
self._revision_master_alternating(use_cache=False, use_subdir=True)
def _revision_master_alternating(self, use_cache=True, use_subdir=False):
version = '4.0'
args_head = [
'--version', version,
]
if use_subdir:
args_head += [ '--subdir', self.fixtures.subdir ]
args_tag2 = args_head + [ '--revision', self.rev(2) ]
self._sequential_calls_with_revision(
version,
[
(0, args_tag2, '2', False),
(0, args_head, '2', use_cache),
(2, args_tag2, '2', use_cache),
(0, args_head, '4', use_cache),
(2, args_tag2, '2', use_cache),
(0, args_head, '6', use_cache),
(0, args_tag2, '2', use_cache),
],
use_cache
)
def _sequential_calls_with_revision(self, version, calls, use_cache=True):
mkfreshdir(self.pkgdir)
basename = self.basename(version = version)
if not use_cache:
self.disableCache()
while calls:
new_commits, args, expected, expect_cache_hit = calls.pop(0)
if new_commits > 0:
self.fixtures.create_commits(new_commits)
self.scmlogs.annotate("about to run: " + pformat(args))
self.scmlogs.annotate("expecting tar to contain: " + expected)
self.tar_scm_std(*args)
logpath = self.scmlogs.current_log_path
loglines = self.scmlogs.read()
if expect_cache_hit:
self.assertRanUpdate(logpath, loglines)
else:
self.assertRanInitialClone(logpath, loglines)
if self.fixtures.subdir in args:
th = self.assertTarOnly(basename,
tarchecker=self.assertSubdirTar)
tarent = 'b'
else:
th = self.assertTarOnly(basename)
tarent = 'a'
self.assertTarMemberContains(th, basename + '/' + tarent, expected)
self.scmlogs.next()
self.postRun()
def test_switch_revision_and_subdir(self):
self._switch_revision_and_subdir()
def test_switch_revision_and_subdir_no_cache(self):
self._switch_revision_and_subdir(use_cache=False)
def _switch_revision_and_subdir(self, use_cache=True):
version = '5.0'
args = [
'--version', version,
]
args_subdir = args+ [ '--subdir', self.fixtures.subdir ]
args_tag2 = args + [ '--revision', self.rev(2) ]
self._sequential_calls_with_revision(
version,
[
(0, args_tag2, '2', False),
(0, args_subdir, '2', use_cache and self.scm != 'svn'),
(2, args_tag2, '2', use_cache),
(0, args_subdir, '4', use_cache),
(2, args_tag2, '2', use_cache),
(0, args_subdir, '6', use_cache),
(0, args_tag2, '2', use_cache),
],
use_cache
)
++++++ fixtures.py ++++++
#!/usr/bin/python
import os
import shutil
class Fixtures:
name = 'tar_scm test suite'
email = 'root@localhost'
name_and_email = '%s <%s>' % (name, email)
subdir = 'subdir'
subdir1 = 'subdir1'
subdir2 = 'subdir2'
next_commit_rev = 1
def __init__(self, container_dir, scmlogs):
self.container_dir = container_dir
self.scmlogs = scmlogs
self.repo_path = self.container_dir + '/repo'
self.repo_url = 'file://' + self.repo_path
# Keys are stringified integers representing commit sequence numbers;
# values can be passed to --revision
self.revs = { }
def setup(self):
print self.__class__.__name__ + ": setting up fixtures"
self.init_fixtures_dir()
self.init()
def init_fixtures_dir(self):
if os.path.exists(self.repo_path):
shutil.rmtree(self.repo_path)
def init(self):
raise NotImplementedError, \
self.__class__.__name__ + " didn't implement init()"
def create_commits(self, num_commits):
self.scmlogs.annotate("Creating %d commits ..." % num_commits)
if num_commits == 0:
return
for i in xrange(0, num_commits):
new_rev = self.create_commit()
self.record_rev(new_rev)
self.scmlogs.annotate("Created %d commits; now at %s" % (num_commits,
new_rev))
def create_commit(self):
os.chdir(self.wd)
newly_created = self.prep_commit()
self.do_commit(newly_created)
new_rev = self.next_commit_rev
self.next_commit_rev += 1
return new_rev
def prep_commit(self):
"""
Caller should ensure correct cwd.
Returns list of newly created files.
"""
newly_created = [ ]
if not os.path.exists('a'):
newly_created.append('a')
if not os.path.exists(self.subdir):
os.mkdir(self.subdir)
# This will take care of adding subdir/b too
newly_created.append(self.subdir)
for fn in ('a', self.subdir + '/b'):
f = open(fn, 'w')
f.write(str(self.next_commit_rev))
f.close()
return newly_created
++++++ gitfixtures.py ++++++
#!/usr/bin/python
import os
from fixtures import Fixtures
from utils import mkfreshdir, run_git
class GitFixtures(Fixtures):
def init(self):
self.create_repo()
self.timestamps = { }
self.sha1s = { }
self.create_commits(2)
def run(self, cmd):
return run_git(self.repo_path, cmd)
def create_repo(self):
os.makedirs(self.repo_path)
os.chdir(self.repo_path)
self.run('init')
self.run('config user.name test')
self.run('config user.email [email protected]')
self.wd = self.repo_path
print "created repo", self.repo_path
def do_commit(self, newly_created):
self.run('add .')
self.run('commit -m%d' % self.next_commit_rev)
def get_metadata(self, formatstr):
return self.run('log -n1 --pretty=format:"%s"' % formatstr)[0]
def record_rev(self, rev_num):
tag = 'tag' + str(rev_num)
self.run('tag ' + tag)
self.revs[rev_num] = tag
self.timestamps[tag] = self.get_metadata('%at')
self.sha1s[tag] = self.get_metadata('%h')
self.scmlogs.annotate(
"Recorded rev %d: id %s, timestamp %s, SHA1 %s" % \
(rev_num,
tag,
self.timestamps[tag],
self.sha1s[tag])
)
++++++ githgtests.py ++++++
#!/usr/bin/python
import os
from commontests import CommonTests
from utils import run_hg
class GitHgTests(CommonTests):
mixed_version_template = '%s.master.%s'
def test_versionformat_abbrevhash(self):
self.tar_scm_std('--versionformat', self.abbrev_hash_format)
self.assertTarOnly(self.basename(version = self.sha1s(self.rev(2))))
def test_versionformat_timestamp(self):
self.tar_scm_std('--versionformat', self.timestamp_format)
self.assertTarOnly(self.basename(version =
self.timestamps(self.rev(2))))
def _mixed_version_format(self):
return self.mixed_version_template % (self.timestamp_format,
self.abbrev_hash_format)
def _mixed_version(self):
return self.mixed_version_template % (self.timestamps(self.rev(2)),
self.sha1s(self.rev(2)))
def test_versionformat_mixed(self):
self.tar_scm_std('--versionformat', self._mixed_version_format())
self.assertTarOnly(self.basename(version = self._mixed_version()))
def test_version_versionformat(self):
self.tar_scm_std('--version', '3.0', '--versionformat',
self._mixed_version_format())
self.assertTarOnly(self.basename(version = self._mixed_version()))
def test_versionformat_revision(self):
self.fixtures.create_commits(4)
self.tar_scm_std('--versionformat', self.abbrev_hash_format,
'--revision', self.rev(2))
basename = self.basename(version = self.sha1s(self.rev(2)))
th = self.assertTarOnly(basename)
self.assertTarMemberContains(th, basename + '/a', '2')
++++++ gittests.py ++++++
#!/usr/bin/python
from githgtests import GitHgTests
from gitfixtures import GitFixtures
from utils import run_git
class GitTests(GitHgTests):
scm = 'git'
initial_clone_command = 'git clone'
update_cache_command = 'git fetch'
fixtures_class = GitFixtures
abbrev_hash_format = '%h'
timestamp_format = '%at'
def default_version(self):
return self.timestamps(self.rev(2))
++++++ hgfixtures.py ++++++
#!/usr/bin/python
import os
from fixtures import Fixtures
from utils import mkfreshdir, run_hg
class HgFixtures(Fixtures):
def init(self):
self.create_repo()
self.timestamps = { }
self.sha1s = { }
self.create_commits(2)
def run(self, cmd):
return run_hg(self.repo_path, cmd)
def create_repo(self):
os.makedirs(self.repo_path)
os.chdir(self.repo_path)
self.run('init')
c = open('.hg/hgrc', 'w')
c.write("[ui]\nusername = %s\n" % self.name_and_email)
c.close()
self.wd = self.repo_path
print "created repo", self.repo_path
def do_commit(self, newly_created):
self.run('add .')
self.run('commit -m%d' % self.next_commit_rev)
def get_metadata(self, formatstr):
return self.run('log -l1 --template "%s"' % formatstr)[0]
def record_rev(self, rev_num):
tag = str(rev_num - 1) # hg starts counting changesets at 0
self.revs[rev_num] = tag
self.timestamps[tag] = self.get_metadata('{date}')
self.sha1s[tag] = self.get_metadata('{node|short}')
self.scmlogs.annotate(
"Recorded rev %d: id %s, timestamp %s, SHA1 %s" % \
(rev_num,
tag,
self.timestamps[tag],
self.sha1s[tag])
)
++++++ hgtests.py ++++++
#!/usr/bin/python
from githgtests import GitHgTests
from hgfixtures import HgFixtures
from utils import run_hg
class HgTests(GitHgTests):
scm = 'hg'
initial_clone_command = 'hg clone'
update_cache_command = 'hg pull'
fixtures_class = HgFixtures
abbrev_hash_format = '{node|short}'
timestamp_format = '{date}'
def default_version(self):
return self.rev(2)
++++++ scm-wrapper ++++++
#!/bin/bash
# Wrapper around SCM to enable behaviour verification testing
# on tar_scm's repository caching code. This is cleaner than
# writing tests which look inside the cache, because then they
# become coupled to the cache's implementation, and require
# knowledge of where the cache lives etc.
me=`basename $0`
if [ -z "$SCM_INVOCATION_LOG" ]; then
cat <<EOF >&2
\$SCM_INVOCATION_LOG must be set before calling $0.
It should be invoked from the test suite, not directly.
EOF
exit 1
fi
if [ "$me" = 'scm-wrapper' ]; then
echo "$me should not be invoked directly, only via symlink" >&2
exit 1
fi
echo "$me $*" >> "$SCM_INVOCATION_LOG"
/usr/bin/$me "$@"
++++++ scmlogs.py ++++++
#!/usr/bin/python
import glob
import os
class ScmInvocationLogs:
"""
Provides log files which tracks invocations of SCM binaries. The
tracking is done via a wrapper around SCM to enable behaviour
verification testing on tar_scm's repository caching code. This
is cleaner than writing tests which look inside the cache, because
then they become coupled to the cache's implementation, and
require knowledge of where the cache lives etc.
One instance should be constructed per unit test. If the test
invokes the SCM binary multiple times, invoke next() in between
each, so that a separate log file is used for each invocation -
this allows more accurate fine-grained assertions on the
invocation log.
"""
@classmethod
def setup_bin_wrapper(cls, scm, tmp_dir):
cls.wrapper_dir = tmp_dir + '/wrappers'
if not os.path.exists(cls.wrapper_dir):
os.makedirs(cls.wrapper_dir)
wrapper = cls.wrapper_dir + '/' + scm
if not os.path.exists(wrapper):
os.symlink('../../scm-wrapper', wrapper)
path = os.getenv('PATH')
prepend = cls.wrapper_dir + ':'
if not path.startswith(prepend):
new_path = prepend + path
os.environ['PATH'] = new_path
def __init__(self, scm, test_dir):
self.scm = scm
self.test_dir = test_dir
self.counter = 0
self.unlink_existing_logs()
def get_log_file_template(self):
return '%s-invocation-%%s.log' % self.scm
def get_log_path_template(self):
return os.path.join(self.test_dir, self.get_log_file_template())
def unlink_existing_logs(self):
pat = self.get_log_path_template() % '*'
for log in glob.glob(pat):
os.unlink(log)
def get_log_file(self, identifier):
if identifier:
identifier = '-' + identifier
return self.get_log_file_template() % ('%02d%s' % (self.counter,
identifier))
def get_log_path(self, identifier):
return os.path.join(self.test_dir, self.get_log_file(identifier))
def next(self, identifier=''):
self.counter += 1
self.current_log_path = self.get_log_path(identifier)
if os.path.exists(self.current_log_path):
raise RuntimeError, "%s already existed?!" % self.current_log_path
os.putenv('SCM_INVOCATION_LOG', self.current_log_path)
def annotate(self, msg):
log = open(self.current_log_path, 'a')
log.write('# ' + msg + "\n")
print msg
log.close()
def read(self):
if not os.path.exists(self.current_log_path):
return '<no %s log>' % self.scm
log = open(self.current_log_path)
loglines = log.readlines()
log.close()
return loglines
++++++ svnfixtures.py ++++++
#!/usr/bin/python
import os
from fixtures import Fixtures
from utils import mkfreshdir, quietrun, run_svn
class SvnFixtures(Fixtures):
def init(self):
self.wd_path = self.container_dir + '/wd'
self.create_repo()
self.checkout_repo()
self.added = { }
self.timestamps = { }
self.create_commits(2)
def run(self, cmd):
return run_svn(self.wd_path, cmd)
def create_repo(self):
quietrun('svnadmin create ' + self.repo_path)
print "created repo", self.repo_path
def checkout_repo(self):
mkfreshdir(self.wd_path)
quietrun('svn checkout %s %s' % (self.repo_url, self.wd_path))
self.wd = self.wd_path
def do_commit(self, newly_created):
for new in newly_created:
if not new in self.added:
self.run('add ' + new)
self.added[new] = True
self.run('commit -m%d' % self.next_commit_rev)
def get_metadata(self, formatstr):
return self.run('log -n1' % formatstr)[0]
def record_rev(self, rev_num):
self.revs[rev_num] = str(rev_num)
self.scmlogs.annotate("Recorded rev %d" % rev_num)
++++++ svntests.py ++++++
#!/usr/bin/python
from commontests import CommonTests
from svnfixtures import SvnFixtures
from utils import run_svn
class SvnTests(CommonTests):
scm = 'svn'
initial_clone_command = 'svn (co|checkout) '
update_cache_command = 'svn up(date)?'
fixtures_class = SvnFixtures
def default_version(self):
return self.rev(2)
def test_versionformat_rev(self):
self.tar_scm_std('--versionformat', 'myrev%r.svn')
self.assertTarOnly(self.basename(version = 'myrev2.svn'))
def test_version_versionformat(self):
self.tar_scm_std('--version', '3.0', '--versionformat', 'myrev%r.svn')
self.assertTarOnly(self.basename(version = 'myrev2.svn'))
def test_versionformat_revision(self):
self.fixtures.create_commits(4)
self.tar_scm_std('--versionformat', 'foo%r', '--revision', self.rev(2))
basename = self.basename(version = 'foo2')
th = self.assertTarOnly(basename)
self.assertTarMemberContains(th, basename + '/a', '2')
++++++ tar_scm ++++++
++++ 704 lines (skipped)
++++ between /work/SRC/openSUSE:Factory/obs-service-tar_scm/tar_scm
++++ and /work/SRC/openSUSE:Factory/.obs-service-tar_scm.new/tar_scm
++++++ tar_scm.rc ++++++
#
# Define a cache directory here to avoid repeating downloads of the
# same file. This works on the server and on the client side.
#
# Note that this file can be overridden with per-user config placed
# in ~/.obs/tar_scm
#
# WARNING: you need to create three directories inside, when changing from
default:
# mkdir -p repo{,url} incoming
#
#CACHEDIRECTORY="/var/cache/obs/tar_scm"
++++++ tar_scm.service ++++++
--- /var/tmp/diff_new_pack.G30PbZ/_old 2012-02-17 12:18:42.000000000 +0100
+++ /var/tmp/diff_new_pack.G30PbZ/_new 2012-02-17 12:18:42.000000000 +0100
@@ -16,6 +16,12 @@
<parameter name="subdir">
<description>package just a sub directory</description>
</parameter>
+ <parameter name="version">
+ <description>Specify version to be used in tarball. Defaults to
automatically detected value formatted by versionformat parameter. If using
keep-source, you should set this to a fixed constant which will be used to name
the checked out directory.</description>
+ </parameter>
+ <parameter name="versionformat">
+ <description>Auto-generate version from checked out source using this
format string. For git, value is passed via git show --pretty=format:...
(default '%at'); for hg, via hg log --template=... (default '{rev}'); for bzr
and svn, %r is revision (default '%r'). Overrides tarball name defined by
version parameter.</description>
+ </parameter>
<parameter name="versionprefix">
<description>specify a base version as prefix.</description>
</parameter>
@@ -23,13 +29,13 @@
<description>specify a revision</description>
</parameter>
<parameter name="filename">
- <description>base file name to be created</description>
+ <description>name of package - used together with version to determine
tarball name</description>
</parameter>
<parameter name="exclude">
- <description>for sepcifing excludes when creating the tar
ball</description>
+ <description>for specifying excludes when creating the tar
ball</description>
</parameter>
- <parameter name="version">
- <description>version to be used in tar. Setting it to an empty string is
disabling the version tag.</description>
+ <parameter name="include">
+ <description>for specifying subset of files/subdirectories to pack in the
tar ball</description>
</parameter>
<parameter name="package-meta">
<description>Package the meta data of SCM to allow the user or OBS to
update after un-tar</description>
++++++ test.py ++++++
#!/usr/bin/python
import sys
import unittest
from gittests import GitTests
from svntests import SvnTests
from hgtests import HgTests
from bzrtests import BzrTests
if __name__ == '__main__':
suite = unittest.TestSuite()
testclasses = [
SvnTests,
GitTests,
HgTests,
BzrTests,
]
for testclass in testclasses:
suite.addTests(unittest.TestLoader().loadTestsFromTestCase(testclass))
runner_args = {
#'verbosity' : 2,
}
major, minor, micro, releaselevel, serial = sys.version_info
if major > 2 or (major == 2 and minor >= 7):
# New in 2.7
runner_args['buffer'] = True
#runner_args['failfast'] = True
runner = unittest.TextTestRunner(**runner_args)
result = runner.run(suite)
if result.wasSuccessful():
sys.exit(0)
else:
sys.exit(1)
++++++ testassertions.py ++++++
#!/usr/bin/python
import os
from pprint import pprint, pformat
import re
import tarfile
import unittest
line_start = '(^|\n)'
class TestAssertions(unittest.TestCase):
######################################################################
# backported from 2.7 just in case we're running on an older Python
def assertRegexpMatches(self, text, expected_regexp, msg=None):
"""Fail the test unless the text matches the regular expression."""
if isinstance(expected_regexp, basestring):
expected_regexp = re.compile(expected_regexp)
if not expected_regexp.search(text):
msg = msg or "Regexp didn't match"
msg = '%s: %r not found in %r' % (msg, expected_regexp.pattern,
text)
raise self.failureException(msg)
def assertNotRegexpMatches(self, text, unexpected_regexp, msg=None):
"""Fail the test if the text matches the regular expression."""
if isinstance(unexpected_regexp, basestring):
unexpected_regexp = re.compile(unexpected_regexp)
match = unexpected_regexp.search(text)
if match:
msg = msg or "Regexp matched"
msg = '%s: %r matches %r in %r' % (msg,
text[match.start():match.end()],
unexpected_regexp.pattern,
text)
raise self.failureException(msg)
######################################################################
def assertNumDirents(self, dir, expected, msg = ''):
dirents = os.listdir(dir)
got = len(dirents)
if len(msg) > 0: msg += "\n"
msg += 'expected %d file(s), got %d: %s' % (expected, got,
pformat(dirents))
self.assertEqual(expected, got, msg)
return dirents
def assertNumTarEnts(self, tar, expected, msg = ''):
self.assertTrue(tarfile.is_tarfile(tar))
th = tarfile.open(tar)
tarents = th.getmembers()
got = len(tarents)
if len(msg) > 0: msg += "\n"
msg += 'expected %s to have %d entries, got %d:\n%s' % \
(tar, expected, got, pformat(tarents))
self.assertEqual(expected, got, msg)
return th, tarents
def assertStandardTar(self, tar, top):
th, entries = self.assertNumTarEnts(tar, 4)
entries.sort(lambda x, y: cmp(x.name, y.name))
self.assertEqual(entries[0].name, top)
self.assertEqual(entries[1].name, top + '/a')
self.assertEqual(entries[2].name, top + '/subdir')
self.assertEqual(entries[3].name, top + '/subdir/b')
return th
def assertSubdirTar(self, tar, top):
th, entries = self.assertNumTarEnts(tar, 2)
entries.sort(lambda x, y: cmp(x.name, y.name))
self.assertEqual(entries[0].name, top)
self.assertEqual(entries[1].name, top + '/b')
return th
def checkTar(self, tar, tarbasename, toptardir=None, tarchecker=None):
if not toptardir:
toptardir = tarbasename
if not tarchecker:
tarchecker = self.assertStandardTar
self.assertEqual(tar, '%s.tar' % tarbasename)
tarpath = os.path.join(self.outdir, tar)
return tarchecker(tarpath, toptardir)
def assertTarOnly(self, tarbasename, **kwargs):
dirents = self.assertNumDirents(self.outdir, 1)
return self.checkTar(dirents[0], tarbasename, **kwargs)
def assertTarAndDir(self, tarbasename, dirname=None, **kwargs):
if not dirname:
dirname = tarbasename
dirents = self.assertNumDirents(self.outdir, 2)
pprint(dirents)
if dirents[0][-4:] == '.tar':
tar = dirents[0]
wd = dirents[1]
elif dirents[1][-4:] == '.tar':
tar = dirents[1]
wd = dirents[0]
else:
self.fail('no .tar found in ' + self.outdir)
self.assertEqual(wd, dirname)
self.assertTrue(os.path.isdir(os.path.join(self.outdir, wd)),
dirname + ' should be directory')
return self.checkTar(tar, tarbasename, **kwargs)
def assertTarMemberContains(self, th, tarmember, contents):
f = th.extractfile(tarmember)
self.assertEqual(contents, "\n".join(f.readlines()))
def assertRanInitialClone(self, logpath, loglines):
self._find(logpath, loglines, self.initial_clone_command,
self.update_cache_command)
def assertRanUpdate(self, logpath, loglines):
self._find(logpath, loglines, self.update_cache_command,
self.initial_clone_command)
def _find(self, logpath, loglines, should_find, should_not_find):
print "####", should_find
found = False
regexp = re.compile('^' + should_find)
for line in loglines:
msg = \
"Shouldn't find /%s/ in %s; log was:\n" \
"----\n%s\n----\n" \
% (should_not_find, logpath, "".join(loglines))
self.assertNotRegexpMatches(line, should_not_find, msg)
if regexp.search(line):
found = True
msg = \
"Didn't find /%s/ in %s; log was:\n" \
"----\n%s\n----\n" \
% (regexp.pattern, logpath, "".join(loglines))
self.assertTrue(found, msg)
++++++ testenv.py ++++++
#!/usr/bin/python
import os
import shutil
from utils import mkfreshdir, run_cmd
from scmlogs import ScmInvocationLogs
class TestEnvironment:
tests_dir = os.path.abspath(os.path.dirname(__file__)) # os.getcwd()
tmp_dir = tests_dir + '/tmp'
is_setup = False
@classmethod
def tar_scm_bin(cls):
tar_scm = cls.tests_dir + '/tar_scm'
if not os.path.isfile(tar_scm):
raise RuntimeError, "Failed to find tar_scm executable at " +
tar_scm
return tar_scm
@classmethod
def setupClass(cls):
# deliberately not setUpClass - we emulate the behaviour
# to support Python < 2.7
if cls.is_setup:
return
print "++++++ setupClass ++++++"
ScmInvocationLogs.setup_bin_wrapper(cls.scm, cls.tmp_dir)
os.putenv('DEBUG_TAR_SCM', 'yes')
cls.is_setup = True
def calcPaths(self):
if not self._testMethodName.startswith('test_'):
raise RuntimeError, "unexpected test method name: " +
self._testMethodName
self.test_name = self._testMethodName[5:]
self.test_dir = os.path.join(self.tmp_dir, self.scm, self.test_name)
self.pkgdir = os.path.join(self.test_dir, 'pkg')
self.outdir = os.path.join(self.test_dir, 'out')
self.cachedir = os.path.join(self.test_dir, 'cache')
def setUp(self):
self.setupClass()
print "++++++ setUp ++++++"
self.calcPaths()
self.scmlogs = ScmInvocationLogs(self.scm, self.test_dir)
self.scmlogs.next('fixtures')
self.initDirs()
self.fixtures = self.fixtures_class(self.test_dir, self.scmlogs)
self.fixtures.setup()
self.scmlogs.next('start-test')
self.scmlogs.annotate('Starting %s test' % self.test_name)
os.putenv('CACHEDIRECTORY', self.cachedir)
# osc launches source services with cwd as pkg dir
os.chdir(self.pkgdir)
def initDirs(self):
# pkgdir persists between tests to simulate real world use
# (although a test can choose to invoke mkfreshdir)
if not os.path.exists(self.pkgdir):
os.makedirs(self.pkgdir)
for subdir in ('repo', 'repourl', 'incoming'):
mkfreshdir(os.path.join(self.cachedir, subdir))
def disableCache(self):
os.unsetenv('CACHEDIRECTORY')
def tearDown(self):
print "++++++ tearDown ++++++"
self.postRun()
def postRun(self):
print "++++++ postRun +++++++"
self.service = { 'mode' : 'disabled' }
if os.path.exists(self.outdir):
self.simulate_osc_postrun()
def simulate_osc_postrun(self):
"""
Simulate osc copying files from temporary --outdir back to
package source directory, so our tests can catch any
potential side-effects due to the persistent nature of the
package source directory.
"""
temp_dir = self.outdir
dir = self.pkgdir
service = self.service
# This code copied straight out of osc/core.py Serviceinfo.execute():
if service['mode'] == "disabled" or service['mode'] == "trylocal" or
service['mode'] == "localonly" or callmode == "local" or callmode == "trylocal":
for filename in os.listdir(temp_dir):
shutil.move( os.path.join(temp_dir, filename),
os.path.join(dir, filename) )
else:
for filename in os.listdir(temp_dir):
shutil.move( os.path.join(temp_dir, filename),
os.path.join(dir, "_service:"+name+":"+filename) )
def tar_scm_std(self, *args, **kwargs):
return self.tar_scm(self.stdargs(*args), **kwargs)
def tar_scm_std_fail(self, *args):
return self.tar_scm(self.stdargs(*args), should_succeed=False)
def stdargs(self, *args):
return [ '--url', self.fixtures.repo_url, '--scm', self.scm ] +
list(args)
def tar_scm(self, args, should_succeed=True):
# simulate new temporary outdir for each tar_scm invocation
mkfreshdir(self.outdir)
cmdargs = args + [ '--outdir', self.outdir ]
quotedargs = [ "'%s'" % arg for arg in cmdargs ]
cmdstr = 'bash %s %s 2>&1' % (self.tar_scm_bin(), " ".join(quotedargs))
print "\n"
print "-" * 70
print "Running", cmdstr
(stdout, stderr, ret) = run_cmd(cmdstr)
if stdout:
print "STDOUT:"
print "------"
print stdout,
if stderr:
print "STDERR:"
print "------"
print stderr,
print "-" * 70
succeeded = ret == 0
self.assertEqual(succeeded, should_succeed)
return (stdout, stderr, ret)
def rev(self, rev):
return self.fixtures.revs[rev]
def timestamps(self, rev):
return self.fixtures.timestamps[rev]
def sha1s(self, rev):
return self.fixtures.sha1s[rev]
++++++ utils.py ++++++
#!/usr/bin/python
import os
import re
import shutil
import subprocess
def mkfreshdir(path):
if not re.search('.{10}/tmp(/|$)', path):
raise RuntimeError, 'unsafe call: mkfreshdir(%s)' % path
cwd = os.getcwd()
os.chdir('/')
if os.path.exists(path):
shutil.rmtree(path)
os.makedirs(path)
def run_cmd(cmd):
p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
(stdout, stderr) = p.communicate()
return (stdout, stderr, p.returncode)
def quietrun(cmd):
(stdout, stderr, ret) = run_cmd(cmd)
if ret != 0:
print cmd, " failed!"
print stdout
print stderr
return (stdout, stderr, ret)
def run_scm(scm, repo, opts):
cmd = 'cd %s && %s %s' % (repo, scm, opts)
#return subprocess.check_output(cmd, shell=True)
return quietrun(cmd)
def run_git(repo, opts):
return run_scm('git', repo, opts)
def run_svn(repo, opts):
return run_scm('svn', repo, opts)
def run_hg(repo, opts):
return run_scm('hg', repo, opts)
def run_bzr(repo, opts):
return run_scm('bzr', repo, opts)
--
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]