cozos commented on code in PR #26526:
URL: https://github.com/apache/beam/pull/26526#discussion_r1185957495


##########
sdks/python/apache_beam/transforms/core.py:
##########
@@ -2321,6 +2348,43 @@ def _remote_teardown(cls):
     cls._fn = None
 
 
+class _TimeoutDoFn(DoFn):
+  """Process method run in a separate thread allowing timeouts.
+  """
+  def __init__(self, fn, timeout=None):
+    self._fn = fn
+    self._timeout = timeout
+    self._pool = None
+
+  def __getattribute__(self, name):
+    if (name.startswith('__') or name in self.__dict__ or
+        name in type(self).__dict__):
+      return object.__getattribute__(self, name)
+    else:
+      return getattr(self._fn, name)
+
+  def process(self, *args, **kwargs):
+    if self._pool is None:
+      self._pool = concurrent.futures.ThreadPoolExecutor(10)
+    # Ensure we iterate over the entire output list in the given amount of 
time.
+    try:
+      return self._pool.submit(
+          lambda: list(self._fn.process(*args, **kwargs))).result(
+              self._timeout)
+    except TimeoutError:
+      self._pool.shutdown(wait=False)
+      self._pool = None
+      raise
+
+  def teardown(self):
+    try:
+      self._fn.teardown()
+    finally:
+      if self._pool is not None:
+        self._pool.shutdown(wait=False)

Review Comment:
   @robertwb I think comes with lots of problems. `shutdown(wait=False)` is non 
blocking but actively running threads will NOT be cancelled. That means that 
the threads in the pool will become zombies - they continue running, consuming 
memory and CPU resources until its execution finishes. Users who use this will 
think they have a memory leak. 
   
   From 
https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor.shutdown
   
   > If wait is False then this method will return immediately and the 
resources associated with the executor will be freed when all pending futures 
are done executing. Regardless of the value of wait, the entire Python program 
will not exit until all pending futures are done executing.
   >
   > If cancel_futures is True, this method will cancel all pending futures 
that the executor has not started running. Any futures that are completed or 
running won’t be cancelled, regardless of the value of cancel_futures.
   
   
   Try putting an infinite loop in the TimeoutDoFn.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to