lgtm.

On Sat, Mar 31, 2018 at 10:46 PM, Zac Medico <zmed...@gentoo.org> wrote:

> This is useful for asynchronous operations that we might
> need to cancel if they take too long, since (concurrent.
> futures.ProcessPoolExecutor tasks are not cancellable).
> This ability to cancel tasks makes this executor useful
> as an alternative to portage.exception.AlarmSignal.
>
> Also add an asyncio-compatible EventLoop.run_in_executor
> method the uses ForkExecutor as the default executor,
> which will later be used to implement the corresponding
> asyncio.AbstractEventLoop run_in_executor method.
>
> Bug: https://bugs.gentoo.org/649588
> ---
>  pym/portage/util/_eventloop/EventLoop.py      |  45 ++++++++-
>  pym/portage/util/futures/executor/__init__.py |   0
>  pym/portage/util/futures/executor/fork.py     | 130
> ++++++++++++++++++++++++++
>  3 files changed, 174 insertions(+), 1 deletion(-)
>  create mode 100644 pym/portage/util/futures/executor/__init__.py
>  create mode 100644 pym/portage/util/futures/executor/fork.py
>
> diff --git a/pym/portage/util/_eventloop/EventLoop.py
> b/pym/portage/util/_eventloop/EventLoop.py
> index f472a3dae..1574a6837 100644
> --- a/pym/portage/util/_eventloop/EventLoop.py
> +++ b/pym/portage/util/_eventloop/EventLoop.py
> @@ -24,6 +24,7 @@ except ImportError:
>  import portage
>  portage.proxy.lazyimport.lazyimport(globals(),
>         'portage.util.futures.futures:_EventLoopFuture',
> +       'portage.util.futures.executor.fork:ForkExecutor',
>  )
>
>  from portage import OrderedDict
> @@ -122,6 +123,7 @@ class EventLoop(object):
>                 self._idle_callbacks = OrderedDict()
>                 self._timeout_handlers = {}
>                 self._timeout_interval = None
> +               self._default_executor = None
>
>                 self._poll_obj = None
>                 try:
> @@ -721,6 +723,46 @@ class EventLoop(object):
>                 return self._handle(self.timeout_add(
>                         delay * 1000, self._call_soon_callback(callback,
> args)), self)
>
> +       def run_in_executor(self, executor, func, *args):
> +               """
> +               Arrange for a func to be called in the specified executor.
> +
> +               The executor argument should be an Executor instance. The
> default
> +               executor is used if executor is None.
> +
> +               Use functools.partial to pass keywords to the *func*.
> +
> +               @param executor: executor
> +               @type executor: concurrent.futures.Executor or None
> +               @param func: a function to call
> +               @type func: callable
> +               @return: a Future
> +               @rtype: asyncio.Future (or compatible)
> +               """
> +               if executor is None:
> +                       executor = self._default_executor
> +                       if executor is None:
> +                               executor = ForkExecutor(loop=self)
> +                               self._default_executor = executor
> +               return executor.submit(func, *args)
> +
> +       def close(self):
> +               """Close the event loop.
> +
> +               This clears the queues and shuts down the executor,
> +               and waits for it to finish.
> +               """
> +               executor = self._default_executor
> +               if executor is not None:
> +                       self._default_executor = None
> +                       executor.shutdown(wait=True)
> +
> +               if self._poll_obj is not None:
> +                       close = getattr(self._poll_obj, 'close')
> +                       if close is not None:
> +                               close()
> +                       self._poll_obj = None
> +
>
>  _can_poll_device = None
>
> @@ -782,10 +824,11 @@ class _epoll_adapter(object):
>         that is associated with an epoll instance will close automatically
> when
>         it is garbage collected, so it's not necessary to close it
> explicitly.
>         """
> -       __slots__ = ('_epoll_obj',)
> +       __slots__ = ('_epoll_obj', 'close')
>
>         def __init__(self, epoll_obj):
>                 self._epoll_obj = epoll_obj
> +               self.close = epoll_obj.close
>
>         def register(self, fd, *args):
>                 self._epoll_obj.register(fd, *args)
> diff --git a/pym/portage/util/futures/executor/__init__.py
> b/pym/portage/util/futures/executor/__init__.py
> new file mode 100644
> index 000000000..e69de29bb
> diff --git a/pym/portage/util/futures/executor/fork.py
> b/pym/portage/util/futures/executor/fork.py
> new file mode 100644
> index 000000000..9cd1db2ca
> --- /dev/null
> +++ b/pym/portage/util/futures/executor/fork.py
> @@ -0,0 +1,130 @@
> +# Copyright 2018 Gentoo Foundation
> +# Distributed under the terms of the GNU General Public License v2
> +
> +import collections
> +import functools
> +import multiprocessing
> +import os
> +import sys
> +import traceback
> +
> +from portage.util._async.AsyncFunction import AsyncFunction
> +from portage.util._eventloop.global_event_loop import global_event_loop
> +
> +
> +class ForkExecutor(object):
> +       """
> +       An implementation of concurrent.futures.Executor that forks a
> +       new process for each task, with support for cancellation of tasks.
> +
> +       This is entirely driven by an event loop.
> +       """
> +       def __init__(self, max_workers=None, loop=None):
> +               self._max_workers = max_workers or
> multiprocessing.cpu_count()
> +               self._loop = loop or global_event_loop()
> +               self._submit_queue = collections.deque()
> +               self._running_tasks = {}
> +               self._shutdown = False
> +               self._shutdown_future = self._loop.create_future()
> +
> +       def submit(self, fn, *args, **kwargs):
> +               """Submits a callable to be executed with the given
> arguments.
> +
> +               Schedules the callable to be executed as fn(*args,
> **kwargs) and returns
> +               a Future instance representing the execution of the
> callable.
> +
> +               Returns:
> +                       A Future representing the given call.
> +               """
> +               future = self._loop.create_future()
> +               proc = AsyncFunction(target=functools.partial(
> +                       self._guarded_fn_call, fn, args, kwargs))
> +               self._submit_queue.append((future, proc))
> +               self._schedule()
> +               return future
> +
> +       def _schedule(self):
> +               while (not self._shutdown and self._submit_queue and
> +                       len(self._running_tasks) < self._max_workers):
> +                       future, proc = self._submit_queue.popleft()
> +                       
> future.add_done_callback(functools.partial(self._cancel_cb,
> proc))
> +                       
> proc.addExitListener(functools.partial(self._proc_exit,
> future))
> +                       proc.scheduler = self._loop
> +                       proc.start()
> +                       self._running_tasks[id(proc)] = proc
> +
> +       def _cancel_cb(self, proc, future):
> +               if future.cancelled():
> +                       # async, handle the rest in _proc_exit
> +                       proc.cancel()
> +
> +       @staticmethod
> +       def _guarded_fn_call(fn, args, kwargs):
> +               try:
> +                       result = fn(*args, **kwargs)
> +                       exception = None
> +               except Exception as e:
> +                       result = None
> +                       exception = _ExceptionWithTraceback(e)
> +
> +               return result, exception
> +
> +       def _proc_exit(self, future, proc):
> +               if not future.cancelled():
> +                       if proc.returncode == os.EX_OK:
> +                               result, exception = proc.result
> +                               if exception is not None:
> +                                       future.set_exception(exception)
> +                               else:
> +                                       future.set_result(result)
> +                       else:
> +                               # TODO: add special exception class for
> this, maybe
> +                               # distinguish between kill and crash
> +                               future.set_exception(
> +                                       Exception('pid {} crashed or
> killed, exitcode {}'.\
> +                                               format(proc.pid,
> proc.returncode)))
> +
> +               del self._running_tasks[id(proc)]
> +               self._schedule()
> +               if self._shutdown and not self._running_tasks:
> +                       self._shutdown_future.set_result(None)
> +
> +       def shutdown(self, wait=True):
> +               self._shutdown = True
> +               if wait:
> +                       self._loop.run_until_complete(
> self._shutdown_future)
> +
> +       def __enter__(self):
> +               return self
> +
> +       def __exit__(self, exc_type, exc_val, exc_tb):
> +               self.shutdown(wait=True)
> +               return False
> +
> +
> +class _ExceptionWithTraceback:
> +       def __init__(self, exc):
> +               tb = traceback.format_exception(type(exc), exc,
> exc.__traceback__)
> +               tb = ''.join(tb)
> +               self.exc = exc
> +               self.tb = '\n"""\n%s"""' % tb
> +       def __reduce__(self):
> +               return _rebuild_exc, (self.exc, self.tb)
> +
> +
> +class _RemoteTraceback(Exception):
> +       def __init__(self, tb):
> +               self.tb = tb
> +       def __str__(self):
> +               return self.tb
> +
> +
> +def _rebuild_exc(exc, tb):
> +       exc.__cause__ = _RemoteTraceback(tb)
> +       return exc
> +
> +
> +if sys.version_info < (3,):
> +       # Python 2 does not support exception chaining, so
> +       # don't bother to preserve the traceback.
> +       _ExceptionWithTraceback = lambda exc: exc
> --
> 2.13.6
>
>
>

Reply via email to