cozos commented on code in PR #26526:
URL: https://github.com/apache/beam/pull/26526#discussion_r1185957495
##########
sdks/python/apache_beam/transforms/core.py:
##########
@@ -2321,6 +2348,43 @@ def _remote_teardown(cls):
cls._fn = None
+class _TimeoutDoFn(DoFn):
+ """Process method run in a separate thread allowing timeouts.
+ """
+ def __init__(self, fn, timeout=None):
+ self._fn = fn
+ self._timeout = timeout
+ self._pool = None
+
+ def __getattribute__(self, name):
+ if (name.startswith('__') or name in self.__dict__ or
+ name in type(self).__dict__):
+ return object.__getattribute__(self, name)
+ else:
+ return getattr(self._fn, name)
+
+ def process(self, *args, **kwargs):
+ if self._pool is None:
+ self._pool = concurrent.futures.ThreadPoolExecutor(10)
+ # Ensure we iterate over the entire output list in the given amount of
time.
+ try:
+ return self._pool.submit(
+ lambda: list(self._fn.process(*args, **kwargs))).result(
+ self._timeout)
+ except TimeoutError:
+ self._pool.shutdown(wait=False)
+ self._pool = None
+ raise
+
+ def teardown(self):
+ try:
+ self._fn.teardown()
+ finally:
+ if self._pool is not None:
+ self._pool.shutdown(wait=False)
Review Comment:
@robertwb I think comes with lots of problems. `shutdown(wait=False)` is non
blocking but actively running threads will NOT be cancelled. That means that
the threads in the pool will become zombies - they continue running, consuming
memory and CPU resources until its execution finishes - try putting an infinite
loop in the TimeoutDoFn. Users who use this will think they have a memory leak.
From
https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor.shutdown
> If wait is False then this method will return immediately and the
resources associated with the executor will be freed when all pending futures
are done executing. Regardless of the value of wait, the entire Python program
will not exit until all pending futures are done executing.
>
> If cancel_futures is True, this method will cancel all pending futures
that the executor has not started running. Any futures that are completed or
running won’t be cancelled, regardless of the value of cancel_futures.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]