lgtm. On Sat, Mar 31, 2018 at 10:46 PM, Zac Medico <zmed...@gentoo.org> wrote:
> This is useful for asynchronous operations that we might > need to cancel if they take too long, since (concurrent. > futures.ProcessPoolExecutor tasks are not cancellable). > This ability to cancel tasks makes this executor useful > as an alternative to portage.exception.AlarmSignal. > > Also add an asyncio-compatible EventLoop.run_in_executor > method the uses ForkExecutor as the default executor, > which will later be used to implement the corresponding > asyncio.AbstractEventLoop run_in_executor method. > > Bug: https://bugs.gentoo.org/649588 > --- > pym/portage/util/_eventloop/EventLoop.py | 45 ++++++++- > pym/portage/util/futures/executor/__init__.py | 0 > pym/portage/util/futures/executor/fork.py | 130 > ++++++++++++++++++++++++++ > 3 files changed, 174 insertions(+), 1 deletion(-) > create mode 100644 pym/portage/util/futures/executor/__init__.py > create mode 100644 pym/portage/util/futures/executor/fork.py > > diff --git a/pym/portage/util/_eventloop/EventLoop.py > b/pym/portage/util/_eventloop/EventLoop.py > index f472a3dae..1574a6837 100644 > --- a/pym/portage/util/_eventloop/EventLoop.py > +++ b/pym/portage/util/_eventloop/EventLoop.py > @@ -24,6 +24,7 @@ except ImportError: > import portage > portage.proxy.lazyimport.lazyimport(globals(), > 'portage.util.futures.futures:_EventLoopFuture', > + 'portage.util.futures.executor.fork:ForkExecutor', > ) > > from portage import OrderedDict > @@ -122,6 +123,7 @@ class EventLoop(object): > self._idle_callbacks = OrderedDict() > self._timeout_handlers = {} > self._timeout_interval = None > + self._default_executor = None > > self._poll_obj = None > try: > @@ -721,6 +723,46 @@ class EventLoop(object): > return self._handle(self.timeout_add( > delay * 1000, self._call_soon_callback(callback, > args)), self) > > + def run_in_executor(self, executor, func, *args): > + """ > + Arrange for a func to be called in the specified executor. > + > + The executor argument should be an Executor instance. The > default > + executor is used if executor is None. > + > + Use functools.partial to pass keywords to the *func*. > + > + @param executor: executor > + @type executor: concurrent.futures.Executor or None > + @param func: a function to call > + @type func: callable > + @return: a Future > + @rtype: asyncio.Future (or compatible) > + """ > + if executor is None: > + executor = self._default_executor > + if executor is None: > + executor = ForkExecutor(loop=self) > + self._default_executor = executor > + return executor.submit(func, *args) > + > + def close(self): > + """Close the event loop. > + > + This clears the queues and shuts down the executor, > + and waits for it to finish. > + """ > + executor = self._default_executor > + if executor is not None: > + self._default_executor = None > + executor.shutdown(wait=True) > + > + if self._poll_obj is not None: > + close = getattr(self._poll_obj, 'close') > + if close is not None: > + close() > + self._poll_obj = None > + > > _can_poll_device = None > > @@ -782,10 +824,11 @@ class _epoll_adapter(object): > that is associated with an epoll instance will close automatically > when > it is garbage collected, so it's not necessary to close it > explicitly. > """ > - __slots__ = ('_epoll_obj',) > + __slots__ = ('_epoll_obj', 'close') > > def __init__(self, epoll_obj): > self._epoll_obj = epoll_obj > + self.close = epoll_obj.close > > def register(self, fd, *args): > self._epoll_obj.register(fd, *args) > diff --git a/pym/portage/util/futures/executor/__init__.py > b/pym/portage/util/futures/executor/__init__.py > new file mode 100644 > index 000000000..e69de29bb > diff --git a/pym/portage/util/futures/executor/fork.py > b/pym/portage/util/futures/executor/fork.py > new file mode 100644 > index 000000000..9cd1db2ca > --- /dev/null > +++ b/pym/portage/util/futures/executor/fork.py > @@ -0,0 +1,130 @@ > +# Copyright 2018 Gentoo Foundation > +# Distributed under the terms of the GNU General Public License v2 > + > +import collections > +import functools > +import multiprocessing > +import os > +import sys > +import traceback > + > +from portage.util._async.AsyncFunction import AsyncFunction > +from portage.util._eventloop.global_event_loop import global_event_loop > + > + > +class ForkExecutor(object): > + """ > + An implementation of concurrent.futures.Executor that forks a > + new process for each task, with support for cancellation of tasks. > + > + This is entirely driven by an event loop. > + """ > + def __init__(self, max_workers=None, loop=None): > + self._max_workers = max_workers or > multiprocessing.cpu_count() > + self._loop = loop or global_event_loop() > + self._submit_queue = collections.deque() > + self._running_tasks = {} > + self._shutdown = False > + self._shutdown_future = self._loop.create_future() > + > + def submit(self, fn, *args, **kwargs): > + """Submits a callable to be executed with the given > arguments. > + > + Schedules the callable to be executed as fn(*args, > **kwargs) and returns > + a Future instance representing the execution of the > callable. > + > + Returns: > + A Future representing the given call. > + """ > + future = self._loop.create_future() > + proc = AsyncFunction(target=functools.partial( > + self._guarded_fn_call, fn, args, kwargs)) > + self._submit_queue.append((future, proc)) > + self._schedule() > + return future > + > + def _schedule(self): > + while (not self._shutdown and self._submit_queue and > + len(self._running_tasks) < self._max_workers): > + future, proc = self._submit_queue.popleft() > + > future.add_done_callback(functools.partial(self._cancel_cb, > proc)) > + > proc.addExitListener(functools.partial(self._proc_exit, > future)) > + proc.scheduler = self._loop > + proc.start() > + self._running_tasks[id(proc)] = proc > + > + def _cancel_cb(self, proc, future): > + if future.cancelled(): > + # async, handle the rest in _proc_exit > + proc.cancel() > + > + @staticmethod > + def _guarded_fn_call(fn, args, kwargs): > + try: > + result = fn(*args, **kwargs) > + exception = None > + except Exception as e: > + result = None > + exception = _ExceptionWithTraceback(e) > + > + return result, exception > + > + def _proc_exit(self, future, proc): > + if not future.cancelled(): > + if proc.returncode == os.EX_OK: > + result, exception = proc.result > + if exception is not None: > + future.set_exception(exception) > + else: > + future.set_result(result) > + else: > + # TODO: add special exception class for > this, maybe > + # distinguish between kill and crash > + future.set_exception( > + Exception('pid {} crashed or > killed, exitcode {}'.\ > + format(proc.pid, > proc.returncode))) > + > + del self._running_tasks[id(proc)] > + self._schedule() > + if self._shutdown and not self._running_tasks: > + self._shutdown_future.set_result(None) > + > + def shutdown(self, wait=True): > + self._shutdown = True > + if wait: > + self._loop.run_until_complete( > self._shutdown_future) > + > + def __enter__(self): > + return self > + > + def __exit__(self, exc_type, exc_val, exc_tb): > + self.shutdown(wait=True) > + return False > + > + > +class _ExceptionWithTraceback: > + def __init__(self, exc): > + tb = traceback.format_exception(type(exc), exc, > exc.__traceback__) > + tb = ''.join(tb) > + self.exc = exc > + self.tb = '\n"""\n%s"""' % tb > + def __reduce__(self): > + return _rebuild_exc, (self.exc, self.tb) > + > + > +class _RemoteTraceback(Exception): > + def __init__(self, tb): > + self.tb = tb > + def __str__(self): > + return self.tb > + > + > +def _rebuild_exc(exc, tb): > + exc.__cause__ = _RemoteTraceback(tb) > + return exc > + > + > +if sys.version_info < (3,): > + # Python 2 does not support exception chaining, so > + # don't bother to preserve the traceback. > + _ExceptionWithTraceback = lambda exc: exc > -- > 2.13.6 > > >