commit: 4df7a0a0c16c5ded65ad601d39840797b7704770
Author: Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Sun Feb 23 21:44:58 2020 +0000
Commit: Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Mon Feb 24 02:35:15 2020 +0000
URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=4df7a0a0
ForkExecutor: use async_start method
Also, fix AsynchronousTask.async_start to handle cancellation of the
_async_start coroutine, ensuring that start and exit listeners are
notified in this case (otherwise RetryForkExecutorTestCase will hang).
Signed-off-by: Zac Medico <zmedico <AT> gentoo.org>
lib/_emerge/AsynchronousTask.py | 15 +++++++++++++--
lib/portage/util/futures/executor/fork.py | 21 ++++++++++++++++++---
2 files changed, 31 insertions(+), 5 deletions(-)
diff --git a/lib/_emerge/AsynchronousTask.py b/lib/_emerge/AsynchronousTask.py
index d1e23cdf1..1e9e177cb 100644
--- a/lib/_emerge/AsynchronousTask.py
+++ b/lib/_emerge/AsynchronousTask.py
@@ -25,8 +25,19 @@ class AsynchronousTask(SlotObject):
@coroutine
def async_start(self):
- yield self._async_start()
- self._start_hook()
+ try:
+ if self._was_cancelled():
+ raise asyncio.CancelledError
+ yield self._async_start()
+ if self._was_cancelled():
+ raise asyncio.CancelledError
+ except asyncio.CancelledError:
+ self.cancel()
+ self._was_cancelled()
+ self._async_wait()
+ raise
+ finally:
+ self._start_hook()
@coroutine
def _async_start(self):
diff --git a/lib/portage/util/futures/executor/fork.py
b/lib/portage/util/futures/executor/fork.py
index add7b3c9e..3549fdb31 100644
--- a/lib/portage/util/futures/executor/fork.py
+++ b/lib/portage/util/futures/executor/fork.py
@@ -13,6 +13,7 @@ import traceback
from portage.util._async.AsyncFunction import AsyncFunction
from portage.util.futures import asyncio
+from portage.util.futures.compat_coroutine import coroutine
from portage.util.cpuinfo import get_cpu_count
@@ -51,11 +52,25 @@ class ForkExecutor(object):
while (not self._shutdown and self._submit_queue and
len(self._running_tasks) < self._max_workers):
future, proc = self._submit_queue.popleft()
-
future.add_done_callback(functools.partial(self._cancel_cb, proc))
- proc.addExitListener(functools.partial(self._proc_exit,
future))
proc.scheduler = self._loop
- proc.start()
self._running_tasks[id(proc)] = proc
+
future.add_done_callback(functools.partial(self._cancel_cb, proc))
+ proc_future =
asyncio.ensure_future(self._proc_coroutine(proc), loop=self._loop)
+
proc_future.add_done_callback(functools.partial(self._proc_coroutine_done,
future, proc))
+
+ @coroutine
+ def _proc_coroutine(self, proc):
+ yield proc.async_start()
+ yield proc.async_wait()
+
+ def _proc_coroutine_done(self, future, proc, proc_future):
+ try:
+ proc_future.result()
+ except asyncio.CancelledError:
+ future.done() or future.cancel()
+ if proc.poll() is None:
+ proc.cancel()
+ self._proc_exit(future, proc)
def _cancel_cb(self, proc, future):
if future.cancelled():