This is useful for asynchronous operations that we might
need to cancel if they take too long, since (concurrent.
futures.ProcessPoolExecutor tasks are not cancellable).
This ability to cancel tasks makes this executor useful
as an alternative to portage.exception.AlarmSignal.

Also add an asyncio-compatible EventLoop.run_in_executor
method the uses ForkExecutor as the default executor,
which will later be used to implement the corresponding
asyncio.AbstractEventLoop run_in_executor method.

Bug: https://bugs.gentoo.org/649588
---
 pym/portage/util/_eventloop/EventLoop.py      |  45 ++++++++-
 pym/portage/util/futures/executor/__init__.py |   0
 pym/portage/util/futures/executor/fork.py     | 130 ++++++++++++++++++++++++++
 3 files changed, 174 insertions(+), 1 deletion(-)
 create mode 100644 pym/portage/util/futures/executor/__init__.py
 create mode 100644 pym/portage/util/futures/executor/fork.py

diff --git a/pym/portage/util/_eventloop/EventLoop.py 
b/pym/portage/util/_eventloop/EventLoop.py
index f472a3dae..1574a6837 100644
--- a/pym/portage/util/_eventloop/EventLoop.py
+++ b/pym/portage/util/_eventloop/EventLoop.py
@@ -24,6 +24,7 @@ except ImportError:
 import portage
 portage.proxy.lazyimport.lazyimport(globals(),
        'portage.util.futures.futures:_EventLoopFuture',
+       'portage.util.futures.executor.fork:ForkExecutor',
 )
 
 from portage import OrderedDict
@@ -122,6 +123,7 @@ class EventLoop(object):
                self._idle_callbacks = OrderedDict()
                self._timeout_handlers = {}
                self._timeout_interval = None
+               self._default_executor = None
 
                self._poll_obj = None
                try:
@@ -721,6 +723,46 @@ class EventLoop(object):
                return self._handle(self.timeout_add(
                        delay * 1000, self._call_soon_callback(callback, 
args)), self)
 
+       def run_in_executor(self, executor, func, *args):
+               """
+               Arrange for a func to be called in the specified executor.
+
+               The executor argument should be an Executor instance. The 
default
+               executor is used if executor is None.
+
+               Use functools.partial to pass keywords to the *func*.
+
+               @param executor: executor
+               @type executor: concurrent.futures.Executor or None
+               @param func: a function to call
+               @type func: callable
+               @return: a Future
+               @rtype: asyncio.Future (or compatible)
+               """
+               if executor is None:
+                       executor = self._default_executor
+                       if executor is None:
+                               executor = ForkExecutor(loop=self)
+                               self._default_executor = executor
+               return executor.submit(func, *args)
+
+       def close(self):
+               """Close the event loop.
+
+               This clears the queues and shuts down the executor,
+               and waits for it to finish.
+               """
+               executor = self._default_executor
+               if executor is not None:
+                       self._default_executor = None
+                       executor.shutdown(wait=True)
+
+               if self._poll_obj is not None:
+                       close = getattr(self._poll_obj, 'close')
+                       if close is not None:
+                               close()
+                       self._poll_obj = None
+
 
 _can_poll_device = None
 
@@ -782,10 +824,11 @@ class _epoll_adapter(object):
        that is associated with an epoll instance will close automatically when
        it is garbage collected, so it's not necessary to close it explicitly.
        """
-       __slots__ = ('_epoll_obj',)
+       __slots__ = ('_epoll_obj', 'close')
 
        def __init__(self, epoll_obj):
                self._epoll_obj = epoll_obj
+               self.close = epoll_obj.close
 
        def register(self, fd, *args):
                self._epoll_obj.register(fd, *args)
diff --git a/pym/portage/util/futures/executor/__init__.py 
b/pym/portage/util/futures/executor/__init__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/pym/portage/util/futures/executor/fork.py 
b/pym/portage/util/futures/executor/fork.py
new file mode 100644
index 000000000..9cd1db2ca
--- /dev/null
+++ b/pym/portage/util/futures/executor/fork.py
@@ -0,0 +1,130 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import collections
+import functools
+import multiprocessing
+import os
+import sys
+import traceback
+
+from portage.util._async.AsyncFunction import AsyncFunction
+from portage.util._eventloop.global_event_loop import global_event_loop
+
+
+class ForkExecutor(object):
+       """
+       An implementation of concurrent.futures.Executor that forks a
+       new process for each task, with support for cancellation of tasks.
+
+       This is entirely driven by an event loop.
+       """
+       def __init__(self, max_workers=None, loop=None):
+               self._max_workers = max_workers or multiprocessing.cpu_count()
+               self._loop = loop or global_event_loop()
+               self._submit_queue = collections.deque()
+               self._running_tasks = {}
+               self._shutdown = False
+               self._shutdown_future = self._loop.create_future()
+
+       def submit(self, fn, *args, **kwargs):
+               """Submits a callable to be executed with the given arguments.
+
+               Schedules the callable to be executed as fn(*args, **kwargs) 
and returns
+               a Future instance representing the execution of the callable.
+
+               Returns:
+                       A Future representing the given call.
+               """
+               future = self._loop.create_future()
+               proc = AsyncFunction(target=functools.partial(
+                       self._guarded_fn_call, fn, args, kwargs))
+               self._submit_queue.append((future, proc))
+               self._schedule()
+               return future
+
+       def _schedule(self):
+               while (not self._shutdown and self._submit_queue and
+                       len(self._running_tasks) < self._max_workers):
+                       future, proc = self._submit_queue.popleft()
+                       
future.add_done_callback(functools.partial(self._cancel_cb, proc))
+                       proc.addExitListener(functools.partial(self._proc_exit, 
future))
+                       proc.scheduler = self._loop
+                       proc.start()
+                       self._running_tasks[id(proc)] = proc
+
+       def _cancel_cb(self, proc, future):
+               if future.cancelled():
+                       # async, handle the rest in _proc_exit
+                       proc.cancel()
+
+       @staticmethod
+       def _guarded_fn_call(fn, args, kwargs):
+               try:
+                       result = fn(*args, **kwargs)
+                       exception = None
+               except Exception as e:
+                       result = None
+                       exception = _ExceptionWithTraceback(e)
+
+               return result, exception
+
+       def _proc_exit(self, future, proc):
+               if not future.cancelled():
+                       if proc.returncode == os.EX_OK:
+                               result, exception = proc.result
+                               if exception is not None:
+                                       future.set_exception(exception)
+                               else:
+                                       future.set_result(result)
+                       else:
+                               # TODO: add special exception class for this, 
maybe
+                               # distinguish between kill and crash
+                               future.set_exception(
+                                       Exception('pid {} crashed or killed, 
exitcode {}'.\
+                                               format(proc.pid, 
proc.returncode)))
+
+               del self._running_tasks[id(proc)]
+               self._schedule()
+               if self._shutdown and not self._running_tasks:
+                       self._shutdown_future.set_result(None)
+
+       def shutdown(self, wait=True):
+               self._shutdown = True
+               if wait:
+                       self._loop.run_until_complete(self._shutdown_future)
+
+       def __enter__(self):
+               return self
+
+       def __exit__(self, exc_type, exc_val, exc_tb):
+               self.shutdown(wait=True)
+               return False
+
+
+class _ExceptionWithTraceback:
+       def __init__(self, exc):
+               tb = traceback.format_exception(type(exc), exc, 
exc.__traceback__)
+               tb = ''.join(tb)
+               self.exc = exc
+               self.tb = '\n"""\n%s"""' % tb
+       def __reduce__(self):
+               return _rebuild_exc, (self.exc, self.tb)
+
+
+class _RemoteTraceback(Exception):
+       def __init__(self, tb):
+               self.tb = tb
+       def __str__(self):
+               return self.tb
+
+
+def _rebuild_exc(exc, tb):
+       exc.__cause__ = _RemoteTraceback(tb)
+       return exc
+
+
+if sys.version_info < (3,):
+       # Python 2 does not support exception chaining, so
+       # don't bother to preserve the traceback.
+       _ExceptionWithTraceback = lambda exc: exc
-- 
2.13.6


Reply via email to