commit: 389429d798a186bdbeb11354d7f1299f628851fd Author: Zac Medico <zmedico <AT> gentoo <DOT> org> AuthorDate: Thu Apr 9 04:45:16 2020 +0000 Commit: Zac Medico <zmedico <AT> gentoo <DOT> org> CommitDate: Thu Apr 9 06:01:27 2020 +0000 URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=389429d7
Scheduler: wakeup for empty merge queue (bug 711322) Add a wakeup callback to schedule a new merge when the merge queue becomes empty. This prevents the scheduler from hanging in cases where the order of _merge_exit callback invocation may cause the the merge queue to appear non-empty when it is about to become empty. Bug: https://bugs.gentoo.org/711322 Bug: https://bugs.gentoo.org/716636 Signed-off-by: Zac Medico <zmedico <AT> gentoo.org> lib/_emerge/Scheduler.py | 23 +++++++++++++++++++++++ lib/_emerge/SequentialTaskQueue.py | 22 ++++++++++++---------- 2 files changed, 35 insertions(+), 10 deletions(-) diff --git a/lib/_emerge/Scheduler.py b/lib/_emerge/Scheduler.py index ee8f3dd5e..2c0483230 100644 --- a/lib/_emerge/Scheduler.py +++ b/lib/_emerge/Scheduler.py @@ -27,6 +27,7 @@ bad = create_color_func("BAD") from portage._sets import SETPREFIX from portage._sets.base import InternalPackageSet from portage.util import ensure_dirs, writemsg, writemsg_level +from portage.util.futures import asyncio from portage.util.SlotObject import SlotObject from portage.util._async.SchedulerInterface import SchedulerInterface from portage.util._eventloop.EventLoop import EventLoop @@ -241,6 +242,7 @@ class Scheduler(PollScheduler): self._completed_tasks = set() self._main_exit = None self._main_loadavg_handle = None + self._schedule_merge_wakeup_task = None self._failed_pkgs = [] self._failed_pkgs_all = [] @@ -1440,6 +1442,9 @@ class Scheduler(PollScheduler): if self._job_delay_timeout_id is not None: self._job_delay_timeout_id.cancel() self._job_delay_timeout_id = None + if self._schedule_merge_wakeup_task is not None: + self._schedule_merge_wakeup_task.cancel() + self._schedule_merge_wakeup_task = None def _choose_pkg(self): """ @@ -1614,6 +1619,24 @@ class Scheduler(PollScheduler): self._main_loadavg_handle = self._event_loop.call_later( self._loadavg_latency, self._schedule) + # Failure to schedule *after* self._task_queues.merge becomes + # empty will cause the scheduler to hang as in bug 711322. + # Do not rely on scheduling which occurs via the _merge_exit + # method, since the order of callback invocation may cause + # self._task_queues.merge to appear non-empty when it is + # about to become empty. + if (self._task_queues.merge and (self._schedule_merge_wakeup_task is None + or self._schedule_merge_wakeup_task.done())): + self._schedule_merge_wakeup_task = asyncio.ensure_future( + self._task_queues.merge.wait(), loop=self._event_loop) + self._schedule_merge_wakeup_task.add_done_callback( + self._schedule_merge_wakeup) + + def _schedule_merge_wakeup(self, future): + if not future.cancelled(): + future.result() + self._schedule() + def _sigcont_handler(self, signum, frame): self._sigcont_time = time.time() diff --git a/lib/_emerge/SequentialTaskQueue.py b/lib/_emerge/SequentialTaskQueue.py index 656e5cf7c..d2551b1c6 100644 --- a/lib/_emerge/SequentialTaskQueue.py +++ b/lib/_emerge/SequentialTaskQueue.py @@ -1,9 +1,11 @@ -# Copyright 1999-2012 Gentoo Foundation +# Copyright 1999-2020 Gentoo Authors # Distributed under the terms of the GNU General Public License v2 from collections import deque import sys +from portage.util.futures import asyncio +from portage.util.futures.compat_coroutine import coroutine from portage.util.SlotObject import SlotObject class SequentialTaskQueue(SlotObject): @@ -41,12 +43,6 @@ class SequentialTaskQueue(SlotObject): cancelled = getattr(task, "cancelled", None) if not cancelled: self.running_tasks.add(task) - # This callback will be invoked as soon as the task - # exits (before the future's done callback is called), - # and this is required in order for bool(self) to have - # an updated value for Scheduler._schedule to base - # assumptions upon. Delayed updates to bool(self) is - # what caused Scheduler to hang as in bug 711322. task.addExitListener(self._task_exit) task.start() finally: @@ -73,12 +69,18 @@ class SequentialTaskQueue(SlotObject): for task in list(self.running_tasks): task.cancel() + @coroutine def wait(self): """ - Synchronously wait for all running tasks to exit. + Wait for the queue to become empty. This method is a coroutine. """ - while self.running_tasks: - next(iter(self.running_tasks)).wait() + while self: + task = next(iter(self.running_tasks), None) + if task is None: + # Wait for self.running_tasks to populate. + yield asyncio.sleep(0) + else: + yield task.async_wait() def __bool__(self): return bool(self._task_queue or self.running_tasks)
