commit:     496ff326dc18890889d1ea5d2aec590394635960
Author:     Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Mon Aug 10 07:42:51 2015 +0000
Commit:     Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Thu Aug 13 19:49:39 2015 +0000
URL:        https://gitweb.gentoo.org/proj/portage.git/commit/?id=496ff326

sync repositories in parallel (bug 557426)

Repos will now be synced in parallel (including their post-sync hooks),
but a given repo will only be synced after its master(s) have synced (in
case that matters for hooks). Output of concurrent processes will be mixed
(irrelevant with --quiet). Support for FEATURES=metadata-transfer will be
handled in the main process, which may be required for some backends (such
as sqlite).

X-Gentoo-Bug: 557426
X-Gentoo-Bug-url: https://bugs.gentoo.org/show_bug.cgi?id=557426
Acked-by: Brian Dolbec <dolsen <AT> gentoo.org>

 pym/portage/emaint/modules/sync/sync.py   | 129 ++++++++++++++++++++++++++++--
 pym/portage/sync/controller.py            |  31 +++++--
 pym/portage/tests/sync/test_sync_local.py |   6 +-
 pym/portage/util/_async/AsyncFunction.py  |  67 ++++++++++++++++
 4 files changed, 219 insertions(+), 14 deletions(-)

diff --git a/pym/portage/emaint/modules/sync/sync.py 
b/pym/portage/emaint/modules/sync/sync.py
index b463073..879d0f0 100644
--- a/pym/portage/emaint/modules/sync/sync.py
+++ b/pym/portage/emaint/modules/sync/sync.py
@@ -13,6 +13,10 @@ from portage.output import bold, red, create_color_func
 from portage._global_updates import _global_updates
 from portage.sync.controller import SyncManager
 from portage.util import writemsg_level
+from portage.util.digraph import digraph
+from portage.util._async.AsyncScheduler import AsyncScheduler
+from portage.util._eventloop.global_event_loop import global_event_loop
+from portage.util._eventloop.EventLoop import EventLoop
 
 import _emerge
 from _emerge.emergelog import emergelog
@@ -201,6 +205,7 @@ class SyncRepos(object):
                                        k = "--" + k.replace("_", "-")
                                        self.emerge_config.opts[k] = v
 
+               selected_repos = [repo for repo in selected_repos if 
repo.sync_type is not None]
                msgs = []
                if not selected_repos:
                        msgs.append("Emaint sync, nothing to sync... returning")
@@ -213,13 +218,20 @@ class SyncRepos(object):
 
                sync_manager = SyncManager(
                        self.emerge_config.target_config.settings, emergelog)
-               retvals = []
-               for repo in selected_repos:
-                       if repo.sync_type is not None:
-                               returncode, message = 
sync_manager.sync(self.emerge_config, repo)
-                               retvals.append((repo.name, returncode))
-                               if message:
-                                       msgs.append(message)
+
+               max_jobs = (self.emerge_config.opts.get('--jobs', 1)
+                       if 'parallel-fetch' in self.emerge_config.
+                       target_config.settings.features else 1)
+               sync_scheduler = SyncScheduler(emerge_config=self.emerge_config,
+                       selected_repos=selected_repos, 
sync_manager=sync_manager,
+                       max_jobs=max_jobs,
+                       event_loop=global_event_loop() if 
portage._internal_caller else
+                               EventLoop(main=False))
+
+               sync_scheduler.start()
+               sync_scheduler.wait()
+               retvals = sync_scheduler.retvals
+               msgs.extend(sync_scheduler.msgs)
 
                # Reload the whole config.
                portage._sync_mode = False
@@ -287,3 +299,106 @@ class SyncRepos(object):
                        messages.append("Action: %s for repo: %s, returned code 
= %s"
                                % (action, rval[0], rval[1]))
                return messages
+
+
+class SyncScheduler(AsyncScheduler):
+       '''
+       Sync repos in parallel, but don't sync a given repo until all
+       of its masters have synced.
+       '''
+       def __init__(self, **kwargs):
+               '''
+               @param emerge_config: an emerge_config instance
+               @param selected_repos: list of RepoConfig instances
+               @param sync_manager: a SyncManger instance
+               '''
+               self._emerge_config = kwargs.pop('emerge_config')
+               self._selected_repos = kwargs.pop('selected_repos')
+               self._sync_manager = kwargs.pop('sync_manager')
+               AsyncScheduler.__init__(self, **kwargs)
+               self._init_graph()
+               self.retvals = []
+               self.msgs = []
+
+       def _init_graph(self):
+               '''
+               Graph relationships between repos and their masters.
+               '''
+               self._sync_graph = digraph()
+               self._leaf_nodes = []
+               self._repo_map = {}
+               self._running_repos = set()
+               for repo in self._selected_repos:
+                       self._repo_map[repo.name] = repo
+                       self._sync_graph.add(repo.name, None)
+                       for master in repo.masters:
+                               self._repo_map[master.name] = master
+                               self._sync_graph.add(master.name, repo.name)
+               self._update_leaf_nodes()
+
+       def _task_exit(self, task):
+               '''
+               Remove the task from the graph, in order to expose
+               more leaf nodes.
+               '''
+               self._running_tasks.discard(task)
+               returncode = task.returncode
+               if task.returncode == os.EX_OK:
+                       returncode, message, updatecache_flg = task.result
+                       if message:
+                               self.msgs.append(message)
+               repo = task.kwargs['repo'].name
+               self._running_repos.remove(repo)
+               self.retvals.append((repo, returncode))
+               self._sync_graph.remove(repo)
+               self._update_leaf_nodes()
+               super(SyncScheduler, self)._task_exit(self)
+
+       def _update_leaf_nodes(self):
+               '''
+               Populate self._leaf_nodes with current leaves from
+               self._sync_graph. If a circular master relationship
+               is discovered, choose a random node to break the cycle.
+               '''
+               if self._sync_graph and not self._leaf_nodes:
+                       self._leaf_nodes = [obj for obj in
+                               self._sync_graph.leaf_nodes()
+                               if obj not in self._running_repos]
+
+                       if not (self._leaf_nodes or self._running_repos):
+                               # If there is a circular master relationship,
+                               # choose a random node to break the cycle.
+                               self._leaf_nodes = 
[next(iter(self._sync_graph))]
+
+       def _next_task(self):
+               '''
+               Return a task for the next available leaf node.
+               '''
+               if not self._sync_graph:
+                       raise StopIteration()
+               # If self._sync_graph is non-empty, then self._leaf_nodes
+               # is guaranteed to be non-empty, since otherwise
+               # _can_add_job would have returned False and prevented
+               # _next_task from being immediately called.
+               node = self._leaf_nodes.pop()
+               self._running_repos.add(node)
+               self._update_leaf_nodes()
+
+               task = self._sync_manager.async(
+                       self._emerge_config, self._repo_map[node])
+               return task
+
+       def _can_add_job(self):
+               '''
+               Returns False if there are no leaf nodes available.
+               '''
+               if not AsyncScheduler._can_add_job(self):
+                       return False
+               return bool(self._leaf_nodes) and not self._terminated.is_set()
+
+       def _keep_scheduling(self):
+               '''
+               Schedule as long as the graph is non-empty, and we haven't
+               been terminated.
+               '''
+               return bool(self._sync_graph) and not self._terminated.is_set()

diff --git a/pym/portage/sync/controller.py b/pym/portage/sync/controller.py
index 307487f..e992cc4 100644
--- a/pym/portage/sync/controller.py
+++ b/pym/portage/sync/controller.py
@@ -21,6 +21,7 @@ bad = create_color_func("BAD")
 warn = create_color_func("WARN")
 from portage.package.ebuild.doebuild import _check_temp_dir
 from portage.metadata import action_metadata
+from portage.util._async.AsyncFunction import AsyncFunction
 from portage import OrderedDict
 from portage import _unicode_decode
 from portage import util
@@ -113,12 +114,18 @@ class SyncManager(object):
                        return desc
                return []
 
+       def async(self, emerge_config=None, repo=None):
+               proc = AsyncFunction(target=self.sync,
+                       kwargs=dict(emerge_config=emerge_config, repo=repo))
+               proc.addExitListener(self._sync_callback)
+               return proc
 
-       def sync(self, emerge_config=None, repo=None, callback=None):
+       def sync(self, emerge_config=None, repo=None):
                self.emerge_config = emerge_config
-               self.callback = callback or self._sync_callback
+               self.callback = None
                self.repo = repo
                self.exitcode = 1
+               self.updatecache_flg = False
                if repo.sync_type in self.module_names:
                        tasks = 
[self.module_controller.get_class(repo.sync_type)]
                else:
@@ -149,13 +156,14 @@ class SyncManager(object):
 
                self.perform_post_sync_hook(repo.name, repo.sync_uri, 
repo.location)
 
-               return self.exitcode, None
+               return self.exitcode, None, self.updatecache_flg
 
 
        def do_callback(self, result):
                #print("result:", result, "callback()", self.callback)
                exitcode, updatecache_flg = result
                self.exitcode = exitcode
+               self.updatecache_flg = updatecache_flg
                if exitcode == 0:
                        msg = "=== Sync completed for %s" % self.repo.name
                        self.logger(self.xterm_titles, msg)
@@ -310,17 +318,28 @@ class SyncManager(object):
                os.umask(0o022)
                return os.EX_OK
 
+       def _sync_callback(self, proc):
+               """
+               This is called in the parent process, serially, for each of the
+               sync jobs when they complete. Some cache backends such as sqlite
+               may require that cache access be performed serially in the
+               parent process like this.
+               """
+               repo = proc.kwargs['repo']
+               exitcode = proc.returncode
+               updatecache_flg = False
+               if proc.returncode == os.EX_OK:
+                       exitcode, message, updatecache_flg = proc.result
 
-       def _sync_callback(self, exitcode, updatecache_flg):
                if updatecache_flg and "metadata-transfer" not in 
self.settings.features:
                        updatecache_flg = False
 
                if updatecache_flg and \
                        os.path.exists(os.path.join(
-                       self.repo.location, 'metadata', 'md5-cache')):
+                       repo.location, 'metadata', 'md5-cache')):
 
                        # Only update cache for repo.location since that's
                        # the only one that's been synced here.
                        action_metadata(self.settings, self.portdb, 
self.emerge_config.opts,
-                               porttrees=[self.repo.location])
+                               porttrees=[repo.location])
 

diff --git a/pym/portage/tests/sync/test_sync_local.py 
b/pym/portage/tests/sync/test_sync_local.py
index f50caba..7753a26 100644
--- a/pym/portage/tests/sync/test_sync_local.py
+++ b/pym/portage/tests/sync/test_sync_local.py
@@ -55,8 +55,12 @@ class SyncLocalTestCase(TestCase):
                        "dev-libs/A-0": {}
                }
 
+               user_config = {
+                       'make.conf': ('FEATURES="metadata-transfer"',)
+               }
+
                playground = ResolverPlayground(ebuilds=ebuilds,
-                       profile=profile, user_config={}, debug=debug)
+                       profile=profile, user_config=user_config, debug=debug)
                settings = playground.settings
                eprefix = settings["EPREFIX"]
                eroot = settings["EROOT"]

diff --git a/pym/portage/util/_async/AsyncFunction.py 
b/pym/portage/util/_async/AsyncFunction.py
new file mode 100644
index 0000000..b6142a2
--- /dev/null
+++ b/pym/portage/util/_async/AsyncFunction.py
@@ -0,0 +1,67 @@
+# Copyright 2015 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import pickle
+import traceback
+
+from portage import os
+from portage.util._async.ForkProcess import ForkProcess
+from _emerge.PipeReader import PipeReader
+
+class AsyncFunction(ForkProcess):
+       """
+       Execute a function call in a fork, and retrieve the function
+       return value via pickling/unpickling, accessible as the
+       "result" attribute after the forked process has exited.
+       """
+
+       __slots__ = ('args', 'kwargs', 'result', 'target',
+               '_async_func_reader', '_async_func_reader_pw')
+
+       def _start(self):
+               pr, pw = os.pipe()
+               self.fd_pipes = {}
+               self.fd_pipes[pw] = pw
+               self._async_func_reader_pw = pw
+               self._async_func_reader = PipeReader(
+                       input_files={"input":pr},
+                       scheduler=self.scheduler)
+               
self._async_func_reader.addExitListener(self._async_func_reader_exit)
+               self._async_func_reader.start()
+               ForkProcess._start(self)
+               os.close(pw)
+
+       def _run(self):
+               try:
+                       result = self.target(*(self.args or []), **(self.kwargs 
or {}))
+                       os.write(self._async_func_reader_pw, 
pickle.dumps(result))
+               except Exception:
+                       traceback.print_exc()
+                       return 1
+
+               return os.EX_OK
+
+       def _pipe_logger_exit(self, pipe_logger):
+               # Ignore this event, since we want to ensure that we exit
+               # only after _async_func_reader_exit has reached EOF.
+               self._pipe_logger = None
+
+       def _async_func_reader_exit(self, pipe_reader):
+               try:
+                       self.result = pickle.loads(pipe_reader.getvalue())
+               except Exception:
+                       # The child process will have printed a traceback in 
this case,
+                       # and returned an unsuccessful returncode.
+                       pass
+               self._async_func_reader = None
+               self._unregister()
+               self.wait()
+
+       def _unregister(self):
+               ForkProcess._unregister(self)
+
+               pipe_reader = self._async_func_reader
+               if pipe_reader is not None:
+                       self._async_func_reader = None
+                       
pipe_reader.removeExitListener(self._async_func_reader_exit)
+                       pipe_reader.cancel()

Reply via email to