commit: a82dfe797defc1908bd9f97c1118b478994f6444 Author: Zac Medico <zmedico <AT> gentoo <DOT> org> AuthorDate: Thu Nov 12 00:08:20 2015 +0000 Commit: Zac Medico <zmedico <AT> gentoo <DOT> org> CommitDate: Thu Nov 12 18:54:55 2015 +0000 URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=a82dfe79
egencache: parallelize --update-changelogs (bug 565540) Use the TaskScheduler class to parallelize GenChangeLogs. Fix AsyncFunction so it does not re-define 'args' in __slots__. X-Gentoo-Bug: 565540 X-Gentoo-Bug-URL: https://bugs.gentoo.org/show_bug.cgi?id=565540 Acked-by: Brian Dolbec <dolsen <AT> gentoo.org> bin/egencache | 24 +++++++++++++++++++----- pym/portage/util/_async/AsyncFunction.py | 5 ++++- 2 files changed, 23 insertions(+), 6 deletions(-) diff --git a/bin/egencache b/bin/egencache index 51d115a..76eb00b 100755 --- a/bin/egencache +++ b/bin/egencache @@ -55,7 +55,9 @@ from portage.const import TIMESTAMP_FORMAT from portage.manifest import guessManifestFileType from portage.package.ebuild._parallel_manifest.ManifestScheduler import ManifestScheduler from portage.util import cmp_sort_key, writemsg_level +from portage.util._async.AsyncFunction import AsyncFunction from portage.util._async.run_main_scheduler import run_main_scheduler +from portage.util._async.TaskScheduler import TaskScheduler from portage.util._eventloop.global_event_loop import global_event_loop from portage import cpv_getkey from portage.dep import Atom, isjustname @@ -748,7 +750,8 @@ class _special_filename(_filename_base): return self.file_name < other.file_name class GenChangeLogs(object): - def __init__(self, portdb, changelog_output, changelog_reversed): + def __init__(self, portdb, changelog_output, changelog_reversed, + max_jobs=None, max_load=None): self.returncode = os.EX_OK self._portdb = portdb self._wrapper = textwrap.TextWrapper( @@ -758,6 +761,8 @@ class GenChangeLogs(object): ) self._changelog_output = changelog_output self._changelog_reversed = changelog_reversed + self._max_jobs = max_jobs + self._max_load = max_load @staticmethod def grab(cmd): @@ -882,7 +887,7 @@ class GenChangeLogs(object): output.close() - def run(self): + def _task_iter(self): repo_path = self._portdb.porttrees[0] os.chdir(repo_path) @@ -908,7 +913,12 @@ class GenChangeLogs(object): cmod = 0 if float(cmod) < float(lmod): - self.generate_changelog(cp) + yield AsyncFunction(target=self.generate_changelog, args=[cp]) + + def run(self): + return run_main_scheduler( + TaskScheduler(self._task_iter(), event_loop=global_event_loop(), + max_jobs=self._max_jobs, max_load=self._max_load)) def egencache_main(args): @@ -1149,8 +1159,12 @@ def egencache_main(args): if options.update_changelogs: gen_clogs = GenChangeLogs(portdb, changelog_output=options.changelog_output, - changelog_reversed=options.changelog_reversed) - gen_clogs.run() + changelog_reversed=options.changelog_reversed, + max_jobs=options.jobs, + max_load=options.load_average) + signum = gen_clogs.run() + if signum is not None: + sys.exit(128 + signum) ret.append(gen_clogs.returncode) if options.write_timestamp: diff --git a/pym/portage/util/_async/AsyncFunction.py b/pym/portage/util/_async/AsyncFunction.py index b6142a2..40f6c5e 100644 --- a/pym/portage/util/_async/AsyncFunction.py +++ b/pym/portage/util/_async/AsyncFunction.py @@ -15,7 +15,10 @@ class AsyncFunction(ForkProcess): "result" attribute after the forked process has exited. """ - __slots__ = ('args', 'kwargs', 'result', 'target', + # NOTE: This class overrides the meaning of the SpawnProcess 'args' + # attribute, and uses it to hold the positional arguments for the + # 'target' function. + __slots__ = ('kwargs', 'result', 'target', '_async_func_reader', '_async_func_reader_pw') def _start(self):
