Since there aremany ways to manage repository storage, split out a repo
storage framework. The HardlinkQuarantineRepoStorage class implements
the existing default behavior, and the InplaceRepoStorage class
implements the legacy behavior (when sync-allow-hardlinks is disabled in
repos.conf).

Each class implements RepoStorageInterface, which uses coroutine methods
since coroutines are well-suited to the I/O bound tasks that these
methods perform. The _sync_decorator is used to convert coroutine
methods to synchronous methods, for smooth integration into the
surrounding synchronous code.

Bug: https://bugs.gentoo.org/662070
---
 lib/portage/repository/storage/__init__.py         |  0
 .../repository/storage/hardlink_quarantine.py      | 95 ++++++++++++++++++++++
 lib/portage/repository/storage/inplace.py          | 49 +++++++++++
 lib/portage/repository/storage/interface.py        | 87 ++++++++++++++++++++
 lib/portage/sync/controller.py                     |  1 +
 lib/portage/sync/modules/rsync/rsync.py            | 85 +++++--------------
 lib/portage/sync/syncbase.py                       | 31 +++++++
 7 files changed, 284 insertions(+), 64 deletions(-)
 create mode 100644 lib/portage/repository/storage/__init__.py
 create mode 100644 lib/portage/repository/storage/hardlink_quarantine.py
 create mode 100644 lib/portage/repository/storage/inplace.py
 create mode 100644 lib/portage/repository/storage/interface.py

diff --git a/lib/portage/repository/storage/__init__.py 
b/lib/portage/repository/storage/__init__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/lib/portage/repository/storage/hardlink_quarantine.py 
b/lib/portage/repository/storage/hardlink_quarantine.py
new file mode 100644
index 000000000..7e9cf4493
--- /dev/null
+++ b/lib/portage/repository/storage/hardlink_quarantine.py
@@ -0,0 +1,95 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+from portage import os
+from portage.repository.storage.interface import (
+       RepoStorageException,
+       RepoStorageInterface,
+)
+from portage.util.futures import asyncio
+from portage.util.futures.compat_coroutine import (
+       coroutine,
+       coroutine_return,
+)
+
+from _emerge.SpawnProcess import SpawnProcess
+
+
+class HardlinkQuarantineRepoStorage(RepoStorageInterface):
+       """
+       This is the default storage module, since its quite compatible with
+       most configurations.
+
+       It's desirable to be able to create shared hardlinks between the
+       download directory and the normal repository, and this is facilitated
+       by making the download directory be a subdirectory of the normal
+       repository location (ensuring that no mountpoints are crossed).
+       Shared hardlinks are created by using the rsync --link-dest option.
+
+       Since the download is initially unverified, it is safest to save
+       it in a quarantine directory. The quarantine directory is also
+       useful for making the repository update more atomic, so that it
+       less likely that normal repository location will be observed in
+       a partially synced state.
+       """
+       def __init__(self, repo, spawn_kwargs):
+               self._user_location = repo.location
+               self._update_location = None
+               self._spawn_kwargs = spawn_kwargs
+               self._current_update = None
+
+       @coroutine
+       def _check_call(self, cmd):
+               """
+               Run cmd and raise RepoStorageException on failure.
+
+               @param cmd: command to executre
+               @type cmd: list
+               """
+               p = SpawnProcess(args=cmd, scheduler=asyncio._wrap_loop(), 
**self._spawn_kwargs)
+               p.start()
+               if (yield p.async_wait()) != os.EX_OK:
+                       raise RepoStorageException('command exited with status 
{}: {}'.\
+                               format(p.returncode, ' '.join(cmd)))
+
+       @coroutine
+       def init_update(self):
+               update_location = os.path.join(self._user_location, 
'.tmp-unverified-download-quarantine')
+               yield self._check_call(['rm', '-rf', update_location])
+
+               # Use  rsync --link-dest to hardlink a files into 
self._update_location,
+               # since cp -l is not portable.
+               yield self._check_call(['rsync', '-a', '--link-dest', 
self._user_location,
+                       '--exclude', 
'/{}'.format(os.path.basename(update_location)),
+                       self._user_location + '/', update_location + '/'])
+
+               self._update_location = update_location
+
+               coroutine_return(self._update_location)
+
+       @property
+       def current_update(self):
+               if self._update_location is None:
+                       raise RepoStorageException('current update does not 
exist')
+               return self._update_location
+
+       @coroutine
+       def commit_update(self):
+               update_location = self.current_update
+               self._update_location = None
+               yield self._check_call(['rsync', '-a', '--delete',
+                       '--exclude', 
'/{}'.format(os.path.basename(update_location)),
+                       update_location + '/', self._user_location + '/'])
+
+               yield self._check_call(['rm', '-rf', update_location])
+
+       @coroutine
+       def abort_update(self):
+               if self._update_location is not None:
+                       update_location = self._update_location
+                       self._update_location = None
+                       yield self._check_call(['rm', '-rf', update_location])
+
+       @coroutine
+       def garbage_collection(self):
+               yield self.abort_update()
diff --git a/lib/portage/repository/storage/inplace.py 
b/lib/portage/repository/storage/inplace.py
new file mode 100644
index 000000000..f1117ad03
--- /dev/null
+++ b/lib/portage/repository/storage/inplace.py
@@ -0,0 +1,49 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+from portage.repository.storage.interface import (
+       RepoStorageException,
+       RepoStorageInterface,
+)
+from portage.util.futures.compat_coroutine import coroutine, coroutine_return
+
+
+class InplaceRepoStorage(RepoStorageInterface):
+       """
+       Legacy repo storage behavior, where updates are applied in-place.
+       This module is not recommended, since the repository is left in an
+       unspecified (possibly malicious) state if the update fails.
+       """
+       def __init__(self, repo, spawn_kwargs):
+               self._user_location = repo.location
+               self._update_location = None
+
+       @coroutine
+       def init_update(self):
+               self._update_location = self._user_location
+               coroutine_return(self._update_location)
+               yield None
+
+       @property
+       def current_update(self):
+               if self._update_location is None:
+                       raise RepoStorageException('current update does not 
exist')
+               return self._update_location
+
+       @coroutine
+       def commit_update(self):
+               self.current_update
+               self._update_location = None
+               coroutine_return()
+               yield None
+
+       @coroutine
+       def abort_update(self):
+               self._update_location = None
+               coroutine_return()
+               yield None
+
+       @coroutine
+       def garbage_collection(self):
+               coroutine_return()
+               yield None
diff --git a/lib/portage/repository/storage/interface.py 
b/lib/portage/repository/storage/interface.py
new file mode 100644
index 000000000..f83c42b84
--- /dev/null
+++ b/lib/portage/repository/storage/interface.py
@@ -0,0 +1,87 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+from portage.exception import PortageException
+from portage.util.futures.compat_coroutine import coroutine
+
+
+class RepoStorageException(PortageException):
+       """
+       Base class for exceptions raise by RepoStorageInterface.
+       """
+
+
+class RepoStorageInterface(object):
+       """
+       Abstract repository storage interface.
+
+       Implementations can assume that the repo.location directory already
+       exists with appropriate permissions (SyncManager handles this).
+
+       TODO: Add a method to check of a previous uncommitted update, which
+       typically indicates a verification failure:
+           https://bugs.gentoo.org/662386
+       """
+       def __init__(self, repo, spawn_kwargs):
+               """
+               @param repo: repository configuration
+               @type repo: portage.repository.config.RepoConfig
+               @param spawn_kwargs: keyword arguments supported by the
+                       portage.process.spawn function
+               @type spawn_kwargs: dict
+               """
+               raise NotImplementedError
+
+       @coroutine
+       def init_update(self):
+               """
+               Create an update directory as a destination to sync updates to.
+               The directory will be populated with files from the previous
+               immutable snapshot, if available. Note that this directory
+               may contain hardlinks that reference files in the previous
+               immutable snapshot, so these files should not be modified
+               (tools like rsync and git normally break hardlinks when
+               files need to be modified).
+
+               @rtype: str
+               @return: path of directory to update, populated with files from
+                       the previous snapshot if available
+               """
+               raise NotImplementedError
+
+       @property
+       def current_update(self):
+               """
+               Get the current update directory which would have been returned
+               from the most recent call to the init_update method. This raises
+               RepoStorageException if the init_update method has not been
+               called.
+
+               @rtype: str
+               @return: path of directory to update
+               """
+               raise NotImplementedError
+
+       @coroutine
+       def commit_update(self):
+               """
+               Commit the current update directory, so that is becomes the
+               latest immutable snapshot.
+               """
+               raise NotImplementedError
+
+       @coroutine
+       def abort_update(self):
+               """
+               Delete the current update directory. If there was not an update
+               in progress, or it has already been committed, then this has
+               no effect.
+               """
+               raise NotImplementedError
+
+       @coroutine
+       def garbage_collection(self):
+               """
+               Remove expired snapshots.
+               """
+               raise NotImplementedError
diff --git a/lib/portage/sync/controller.py b/lib/portage/sync/controller.py
index 3bccf6f74..bf5750f7f 100644
--- a/lib/portage/sync/controller.py
+++ b/lib/portage/sync/controller.py
@@ -327,6 +327,7 @@ class SyncManager(object):
                # override the defaults when sync_umask is set
                if repo.sync_umask is not None:
                        spawn_kwargs["umask"] = int(repo.sync_umask, 8)
+               spawn_kwargs.setdefault("umask", 0o022)
                self.spawn_kwargs = spawn_kwargs
 
                if self.usersync_uid is not None:
diff --git a/lib/portage/sync/modules/rsync/rsync.py 
b/lib/portage/sync/modules/rsync/rsync.py
index 56e38631e..17b1b9e7b 100644
--- a/lib/portage/sync/modules/rsync/rsync.py
+++ b/lib/portage/sync/modules/rsync/rsync.py
@@ -59,55 +59,6 @@ class RsyncSync(NewBase):
        def __init__(self):
                NewBase.__init__(self, "rsync", RSYNC_PACKAGE_ATOM)
 
-       def _select_download_dir(self):
-               '''
-               Select and return the download directory. It's desirable to be 
able
-               to create shared hardlinks between the download directory to the
-               normal repository, and this is facilitated by making the 
download
-               directory be a subdirectory of the normal repository location
-               (ensuring that no mountpoints are crossed). Shared hardlinks are
-               created by using the rsync --link-dest option.
-
-               Since the download is initially unverified, it is safest to save
-               it in a quarantine directory. The quarantine directory is also
-               useful for making the repository update more atomic, so that it
-               less likely that normal repository location will be observed in
-               a partially synced state.
-
-               This method returns a quarantine directory if 
sync-allow-hardlinks
-               is enabled in repos.conf, and otherwise it returne the normal
-               repository location.
-               '''
-               if self.repo.sync_allow_hardlinks:
-                       return os.path.join(self.repo.location, 
'.tmp-unverified-download-quarantine')
-               else:
-                       return self.repo.location
-
-       def _commit_download(self, download_dir):
-               '''
-               Commit changes from download_dir if it does not refer to the
-               normal repository location.
-               '''
-               exitcode = 0
-               if self.repo.location != download_dir:
-                       rsynccommand = [self.bin_command] + self.rsync_opts + 
self.extra_rsync_opts
-                       rsynccommand.append('--exclude=/%s' % 
os.path.basename(download_dir))
-                       rsynccommand.append('%s/' % download_dir.rstrip('/'))
-                       rsynccommand.append('%s/' % self.repo.location)
-                       exitcode = portage.process.spawn(rsynccommand, 
**self.spawn_kwargs)
-
-               return exitcode
-
-       def _remove_download(self, download_dir):
-               """
-               Remove download_dir if it does not refer to the normal 
repository
-               location.
-               """
-               exitcode = 0
-               if self.repo.location != download_dir:
-                       exitcode = subprocess.call(['rm', '-rf', download_dir])
-               return exitcode
-
        def update(self):
                '''Internal update function which performs the transfer'''
                opts = self.options.get('emerge_config').opts
@@ -143,8 +94,8 @@ class RsyncSync(NewBase):
                        self.extra_rsync_opts.extend(portage.util.shlex_split(
                                
self.repo.module_specific_options['sync-rsync-extra-opts']))
 
-               download_dir = self._select_download_dir()
                exitcode = 0
+               verify_failure = False
 
                # Process GLEP74 verification options.
                # Default verification to 'no'; it's enabled for ::gentoo
@@ -240,10 +191,14 @@ class RsyncSync(NewBase):
                                self.proto = "file"
                                dosyncuri = syncuri[7:]
                                unchanged, is_synced, exitcode, updatecache_flg 
= self._do_rsync(
-                                       dosyncuri, timestamp, opts, 
download_dir)
+                                       dosyncuri, timestamp, opts)
                                self._process_exitcode(exitcode, dosyncuri, 
out, 1)
-                               if exitcode == 0 and not unchanged:
-                                       self._commit_download(download_dir)
+                               if exitcode == 0:
+                                       if unchanged:
+                                               self.repo_storage.abort_update()
+                                       else:
+                                               
self.repo_storage.commit_update()
+                                               
self.repo_storage.garbage_collection()
                                return (exitcode, updatecache_flg)
 
                        retries=0
@@ -375,7 +330,7 @@ class RsyncSync(NewBase):
                                        dosyncuri = dosyncuri[6:].replace('/', 
':/', 1)
 
                                unchanged, is_synced, exitcode, updatecache_flg 
= self._do_rsync(
-                                       dosyncuri, timestamp, opts, 
download_dir)
+                                       dosyncuri, timestamp, opts)
                                if not unchanged:
                                        local_state_unchanged = False
                                if is_synced:
@@ -390,6 +345,7 @@ class RsyncSync(NewBase):
                                        # exit loop
                                        exitcode = EXCEEDED_MAX_RETRIES
                                        break
+
                        self._process_exitcode(exitcode, dosyncuri, out, 
maxretries)
 
                        if local_state_unchanged:
@@ -397,6 +353,8 @@ class RsyncSync(NewBase):
                                # in this case, so refer gemato to the normal 
repository
                                # location.
                                download_dir = self.repo.location
+                       else:
+                               download_dir = self.download_dir
 
                        # if synced successfully, verify now
                        if exitcode == 0 and self.verify_metamanifest:
@@ -448,14 +406,18 @@ class RsyncSync(NewBase):
                                                                % (e,),
                                                                
level=logging.ERROR, noiselevel=-1)
                                                exitcode = 1
+                                               verify_failure = True
 
                        if exitcode == 0 and not local_state_unchanged:
-                               exitcode = self._commit_download(download_dir)
+                               self.repo_storage.commit_update()
+                               self.repo_storage.garbage_collection()
 
                        return (exitcode, updatecache_flg)
                finally:
-                       if exitcode == 0:
-                               self._remove_download(download_dir)
+                       # Don't delete the update if verification failed, in 
case
+                       # the cause needs to be investigated.
+                       if not verify_failure:
+                               self.repo_storage.abort_update()
                        if openpgp_env is not None:
                                openpgp_env.close()
 
@@ -594,7 +556,7 @@ class RsyncSync(NewBase):
                return rsync_opts
 
 
-       def _do_rsync(self, syncuri, timestamp, opts, download_dir):
+       def _do_rsync(self, syncuri, timestamp, opts):
                updatecache_flg = False
                is_synced = False
                if timestamp != 0 and "--quiet" not in opts:
@@ -720,11 +682,6 @@ class RsyncSync(NewBase):
                                # actual sync
                                command = rsynccommand[:]
 
-                               if self.repo.location != download_dir:
-                                       # Use shared hardlinks for files that 
are identical
-                                       # in the previous snapshot of the 
repository.
-                                       command.append('--link-dest=%s' % 
self.repo.location)
-
                                submodule_paths = self._get_submodule_paths()
                                if submodule_paths:
                                        # The only way to select multiple 
directories to
@@ -738,7 +695,7 @@ class RsyncSync(NewBase):
                                else:
                                        command.append(syncuri + "/")
 
-                               command.append(download_dir)
+                               command.append(self.download_dir)
 
                                exitcode = None
                                try:
diff --git a/lib/portage/sync/syncbase.py b/lib/portage/sync/syncbase.py
index ce69a4fc0..1d2a00b7c 100644
--- a/lib/portage/sync/syncbase.py
+++ b/lib/portage/sync/syncbase.py
@@ -15,6 +15,7 @@ import portage
 from portage.util import writemsg_level
 from portage.util._eventloop.global_event_loop import global_event_loop
 from portage.util.backoff import RandomExponentialBackoff
+from portage.util.futures._sync_decorator import _sync_methods
 from portage.util.futures.retry import retry
 from portage.util.futures.executor.fork import ForkExecutor
 from . import _SUBMODULE_PATH_MAP
@@ -40,6 +41,8 @@ class SyncBase(object):
                self.repo = None
                self.xterm_titles = None
                self.spawn_kwargs = None
+               self.repo_storage = None
+               self._download_dir = None
                self.bin_command = None
                self._bin_command = bin_command
                self.bin_pkg = bin_pkg
@@ -72,7 +75,35 @@ class SyncBase(object):
                self.repo = self.options.get('repo', None)
                self.xterm_titles = self.options.get('xterm_titles', False)
                self.spawn_kwargs = self.options.get('spawn_kwargs', None)
+               storage_cls = portage.load_mod(self._select_storage_module())
+               self.repo_storage = _sync_methods(storage_cls(self.repo, 
self.spawn_kwargs))
 
+       def _select_storage_module(self):
+               '''
+               Select an appropriate implementation of RepoStorageInterface, 
based
+               on repos.conf settings.
+
+               @rtype: str
+               @return: name of the selected repo storage constructor
+               '''
+               if self.repo.sync_allow_hardlinks:
+                       mod_name = 
'portage.repository.storage.hardlink_quarantine.HardlinkQuarantineRepoStorage'
+               else:
+                       mod_name = 
'portage.repository.storage.inplace.InplaceRepoStorage'
+               return mod_name
+
+       @property
+       def download_dir(self):
+               """
+               Get the path of the download directory, where the repository
+               update is staged. The directory is initialized lazily, since
+               the repository might already be at the latest revision, and
+               there may be some cost associated with the directory
+               initialization.
+               """
+               if self._download_dir is None:
+                       self._download_dir = self.repo_storage.init_update()
+               return self._download_dir
 
        def exists(self, **kwargs):
                '''Tests whether the repo actually exists'''
-- 
2.16.4


Reply via email to