Since construction of FetchTask instances requires results from
aux_get calls that would trigger event loop recursion when executed
synchronously, add a _fetch_tasks_future function to construct
FetchTask instances asynchronously and return a Future. Use an
_EbuildFetchTasks class to wait for the FetchTask instances to
become available, and then execute them.

Bug: https://bugs.gentoo.org/654038
---
 pym/portage/_emirrordist/FetchIterator.py | 311 ++++++++++++++++++++----------
 1 file changed, 206 insertions(+), 105 deletions(-)

diff --git a/pym/portage/_emirrordist/FetchIterator.py 
b/pym/portage/_emirrordist/FetchIterator.py
index 38419799d..bd3b98cd0 100644
--- a/pym/portage/_emirrordist/FetchIterator.py
+++ b/pym/portage/_emirrordist/FetchIterator.py
@@ -7,14 +7,18 @@ from portage import os
 from portage.checksum import (_apply_hash_filter,
        _filter_unaccelarated_hashes, _hash_filter)
 from portage.dep import use_reduce
-from portage.exception import PortageException
+from portage.exception import PortageException, PortageKeyError
+from portage.util._async.AsyncTaskFuture import AsyncTaskFuture
+from portage.util._async.TaskScheduler import TaskScheduler
+from portage.util.futures.iter_completed import iter_gather
 from .FetchTask import FetchTask
+from _emerge.CompositeTask import CompositeTask
+
 
 class FetchIterator(object):
 
        def __init__(self, config):
                self._config = config
-               self._log_failure = config.log_failure
                self._terminated = threading.Event()
 
        def terminate(self):
@@ -41,9 +45,6 @@ class FetchIterator(object):
 
                portdb = self._config.portdb
                get_repo_for_location = 
portdb.repositories.get_repo_for_location
-               file_owners = self._config.file_owners
-               file_failures = self._config.file_failures
-               restrict_mirror_exemptions = 
self._config.restrict_mirror_exemptions
 
                hash_filter = _hash_filter(
                        portdb.settings.get("PORTAGE_CHECKSUM_FILTER", ""))
@@ -59,110 +60,210 @@ class FetchIterator(object):
 
                                # Reset state so the Manifest is pulled once
                                # for this cp / tree combination.
-                               digests = None
                                repo_config = get_repo_for_location(tree)
+                               digests_future = 
portdb._event_loop.create_future()
 
                                for cpv in portdb.cp_list(cp, mytree=tree):
 
                                        if self._terminated.is_set():
                                                return
 
-                                       try:
-                                               restrict, = portdb.aux_get(cpv, 
("RESTRICT",),
-                                                       mytree=tree)
-                                       except (KeyError, PortageException) as 
e:
-                                               
self._log_failure("%s\t\taux_get exception %s" %
-                                                       (cpv, e))
-                                               continue
-
-                                       # Here we use matchnone=True to ignore 
conditional parts
-                                       # of RESTRICT since they don't apply 
unconditionally.
-                                       # Assume such conditionals only apply 
on the client side.
-                                       try:
-                                               restrict = 
frozenset(use_reduce(restrict,
-                                                       flat=True, 
matchnone=True))
-                                       except PortageException as e:
-                                               
self._log_failure("%s\t\tuse_reduce exception %s" %
-                                                       (cpv, e))
-                                               continue
-
-                                       if "fetch" in restrict:
-                                               continue
-
-                                       try:
-                                               uri_map = 
portdb.getFetchMap(cpv)
-                                       except PortageException as e:
-                                               
self._log_failure("%s\t\tgetFetchMap exception %s" %
-                                                       (cpv, e))
-                                               continue
-
-                                       if not uri_map:
-                                               continue
-
-                                       if "mirror" in restrict:
-                                               skip = False
-                                               if restrict_mirror_exemptions 
is not None:
-                                                       new_uri_map = {}
-                                                       for filename, uri_tuple 
in uri_map.items():
-                                                               for uri in 
uri_tuple:
-                                                                       if 
uri[:9] == "mirror://":
-                                                                               
i = uri.find("/", 9)
-                                                                               
if i != -1 and uri[9:i].strip("/") in \
-                                                                               
        restrict_mirror_exemptions:
-                                                                               
        new_uri_map[filename] = uri_tuple
-                                                                               
        break
-                                                       if new_uri_map:
-                                                               uri_map = 
new_uri_map
-                                                       else:
-                                                               skip = True
-                                               else:
-                                                       skip = True
-
-                                               if skip:
-                                                       continue
-
-                                       # Parse Manifest for this cp if we 
haven't yet.
-                                       if digests is None:
-                                               try:
-                                                       digests = 
repo_config.load_manifest(
-                                                               
os.path.join(repo_config.location, cp)
-                                                               
).getTypeDigests("DIST")
-                                               except (EnvironmentError, 
PortageException) as e:
-                                                       for filename in uri_map:
-                                                               
self._log_failure(
-                                                                       
"%s\t%s\tManifest exception %s" %
-                                                                       (cpv, 
filename, e))
-                                                               
file_failures[filename] = cpv
-                                                       continue
-
-                                       if not digests:
-                                               for filename in uri_map:
-                                                       
self._log_failure("%s\t%s\tdigest entry missing" %
-                                                               (cpv, filename))
-                                                       file_failures[filename] 
= cpv
-                                               continue
-
-                                       for filename, uri_tuple in 
uri_map.items():
-                                               file_digests = 
digests.get(filename)
-                                               if file_digests is None:
-                                                       
self._log_failure("%s\t%s\tdigest entry missing" %
-                                                               (cpv, filename))
-                                                       file_failures[filename] 
= cpv
-                                                       continue
-                                               if filename in file_owners:
-                                                       continue
-                                               file_owners[filename] = cpv
-
-                                               file_digests = \
-                                                       
_filter_unaccelarated_hashes(file_digests)
-                                               if hash_filter is not None:
-                                                       file_digests = 
_apply_hash_filter(
-                                                               file_digests, 
hash_filter)
-
-                                               yield FetchTask(cpv=cpv,
-                                                       background=True,
-                                                       digests=file_digests,
-                                                       distfile=filename,
-                                                       restrict=restrict,
-                                                       uri_tuple=uri_tuple,
-                                                       config=self._config)
+                                       yield _EbuildFetchTasks(
+                                               
fetch_tasks_future=_fetch_tasks_future(
+                                                       self._config,
+                                                       hash_filter,
+                                                       repo_config,
+                                                       digests_future,
+                                                       cpv)
+                                       )
+
+
+class _EbuildFetchTasks(CompositeTask):
+       """
+       This executes asynchronously constructed FetchTask instances for
+       each of the files referenced by an ebuild.
+       """
+       __slots__ = ('fetch_tasks_future',)
+       def _start(self):
+               
self._start_task(AsyncTaskFuture(future=self.fetch_tasks_future),
+                       self._start_fetch_tasks)
+
+       def _start_fetch_tasks(self, task):
+               if self._default_exit(task) != os.EX_OK:
+                       self._async_wait()
+                       return
+
+               self._start_task(
+                       TaskScheduler(
+                               iter(self.fetch_tasks_future.result()),
+                               max_jobs=1,
+                               event_loop=self.scheduler),
+                       self._default_final_exit)
+
+
+def _fetch_tasks_future(config, hash_filter, repo_config, digests_future, cpv):
+       """
+       Asynchronously construct FetchTask instances for each of the files
+       referenced by an ebuild.
+
+       @param config: emirrordist config
+       @type config: portage._emirrordist.Config.Config
+       @param hash_filter: PORTAGE_CHECKSUM_FILTER settings
+       @type hash_filter: portage.checksum._hash_filter
+       @param repo_config: repository configuration
+       @type repo_config: RepoConfig
+       @param digests_future: future that contains cached distfiles digests
+               for the current cp if available
+       @type digests_future: asyncio.Future
+       @param cpv: current ebuild cpv
+       @type cpv: portage.versions._pkg_str
+
+       @return: A future that results in a list containing FetchTask
+               instances for each of the files referenced by an ebuild.
+       @rtype: asyncio.Future (or compatible)
+       """
+
+       loop = config.portdb._event_loop
+       result = loop.create_future()
+       fetch_tasks = []
+
+       def aux_get_done(gather_result):
+               if result.cancelled():
+                       return
+
+               aux_get_result, fetch_map_result = gather_result.result()
+               try:
+                       restrict, = aux_get_result.result()
+               except (PortageKeyError, PortageException) as e:
+                       config.log_failure("%s\t\taux_get exception %s" %
+                               (cpv, e))
+                       result.set_result(fetch_tasks)
+                       return
+
+               # Here we use matchnone=True to ignore conditional parts
+               # of RESTRICT since they don't apply unconditionally.
+               # Assume such conditionals only apply on the client side.
+               try:
+                       restrict = frozenset(use_reduce(restrict,
+                               flat=True, matchnone=True))
+               except PortageException as e:
+                       config.log_failure("%s\t\tuse_reduce exception %s" %
+                               (cpv, e))
+                       result.set_result(fetch_tasks)
+                       return
+
+               if "fetch" in restrict:
+                       result.set_result(fetch_tasks)
+                       return
+
+               try:
+                       uri_map = fetch_map_result.result()
+               except PortageException as e:
+                       config.log_failure("%s\t\tgetFetchMap exception %s" %
+                               (cpv, e))
+                       result.set_result(fetch_tasks)
+                       return
+
+               if not uri_map:
+                       result.set_result(fetch_tasks)
+                       return
+
+               if "mirror" in restrict:
+                       skip = False
+                       if config.restrict_mirror_exemptions is not None:
+                               new_uri_map = {}
+                               for filename, uri_tuple in uri_map.items():
+                                       for uri in uri_tuple:
+                                               if uri[:9] == "mirror://":
+                                                       i = uri.find("/", 9)
+                                                       if i != -1 and 
uri[9:i].strip("/") in \
+                                                               
config.restrict_mirror_exemptions:
+                                                               
new_uri_map[filename] = uri_tuple
+                                                               break
+                               if new_uri_map:
+                                       uri_map = new_uri_map
+                               else:
+                                       skip = True
+                       else:
+                               skip = True
+
+                       if skip:
+                               result.set_result(fetch_tasks)
+                               return
+
+               # Parse Manifest for this cp if we haven't yet.
+               if not digests_future.done():
+                       try:
+                               digests = repo_config.load_manifest(
+                                       os.path.join(repo_config.location, 
cpv.cp)).\
+                                       getTypeDigests("DIST")
+                       except (EnvironmentError, PortageException) as e:
+                               digests_future.set_exception(e)
+                               for filename in uri_map:
+                                       config.log_failure(
+                                               "%s\t%s\tManifest exception %s" 
%
+                                               (cpv, filename, e))
+                                       config.file_failures[filename] = cpv
+                               result.set_result(fetch_tasks)
+                               return
+                       else:
+                               digests_future.set_result(digests)
+
+               digests = digests_future.result()
+               if not digests:
+                       for filename in uri_map:
+                               config.log_failure("%s\t%s\tdigest entry 
missing" %
+                                       (cpv, filename))
+                               config.file_failures[filename] = cpv
+                       result.set_result(fetch_tasks)
+                       return
+
+               for filename, uri_tuple in uri_map.items():
+                       file_digests = digests.get(filename)
+                       if file_digests is None:
+                               config.log_failure("%s\t%s\tdigest entry 
missing" %
+                                       (cpv, filename))
+                               config.file_failures[filename] = cpv
+                               continue
+                       if filename in config.file_owners:
+                               continue
+                       config.file_owners[filename] = cpv
+
+                       file_digests = \
+                               _filter_unaccelarated_hashes(file_digests)
+                       if hash_filter is not None:
+                               file_digests = _apply_hash_filter(
+                                       file_digests, hash_filter)
+
+                       fetch_tasks.append(FetchTask(
+                               cpv=cpv,
+                               background=True,
+                               digests=file_digests,
+                               distfile=filename,
+                               restrict=restrict,
+                               uri_tuple=uri_tuple,
+                               config=config))
+
+               result.set_result(fetch_tasks)
+
+       def future_generator():
+               yield config.portdb.async_aux_get(cpv, ("RESTRICT",),
+                       myrepo=repo_config.name, loop=loop)
+               yield config.portdb.async_fetch_map(cpv,
+                       mytree=repo_config.location, loop=loop)
+
+       # Use iter_gather(max_jobs=1) to limit the number of processes per
+       # _EbuildFetchTask instance, and also to avoid spawning two bash
+       # processes for the same cpv simultaneously (the second one can
+       # use metadata cached by the first one).
+       gather_result = iter_gather(
+               future_generator(),
+               max_jobs=1,
+               loop=loop,
+       )
+       gather_result.add_done_callback(aux_get_done)
+       result.add_done_callback(lambda result:
+               gather_result.cancel() if result.cancelled() and
+               not gather_result.done() else None)
+
+       return result
-- 
2.13.6


Reply via email to