cozos commented on code in PR #26526:
URL: https://github.com/apache/beam/pull/26526#discussion_r1185957495


##########
sdks/python/apache_beam/transforms/core.py:
##########
@@ -2321,6 +2348,43 @@ def _remote_teardown(cls):
     cls._fn = None
 
 
+class _TimeoutDoFn(DoFn):
+  """Process method run in a separate thread allowing timeouts.
+  """
+  def __init__(self, fn, timeout=None):
+    self._fn = fn
+    self._timeout = timeout
+    self._pool = None
+
+  def __getattribute__(self, name):
+    if (name.startswith('__') or name in self.__dict__ or
+        name in type(self).__dict__):
+      return object.__getattribute__(self, name)
+    else:
+      return getattr(self._fn, name)
+
+  def process(self, *args, **kwargs):
+    if self._pool is None:
+      self._pool = concurrent.futures.ThreadPoolExecutor(10)
+    # Ensure we iterate over the entire output list in the given amount of 
time.
+    try:
+      return self._pool.submit(
+          lambda: list(self._fn.process(*args, **kwargs))).result(
+              self._timeout)
+    except TimeoutError:
+      self._pool.shutdown(wait=False)
+      self._pool = None
+      raise
+
+  def teardown(self):
+    try:
+      self._fn.teardown()
+    finally:
+      if self._pool is not None:
+        self._pool.shutdown(wait=False)

Review Comment:
   @robertwb I think comes with lots of problems. `shutdown(wait=False)` is non 
blocking but actively running threads will NOT be cancelled. That means that 
the threads in the pool will become zombies - they continue running, consuming 
memory and CPU resources until its execution finishes. Users who use this will 
think they have a memory leak. 
   
   From 
https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor.shutdown
   
   ```
   If wait is False then this method will return immediately and the resources 
associated with the executor will be freed when all pending futures are done 
executing. Regardless of the value of wait, the entire Python program will not 
exit until all pending futures are done executing.
   
   If cancel_futures is True, this method will cancel all pending futures that 
the executor has not started running. Any futures that are completed or running 
won’t be cancelled, regardless of the value of cancel_futures.
   ```
   
   Try putting an infinite loop in the TimeoutDoFn.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to