scwhittle commented on code in PR #35391:
URL: https://github.com/apache/beam/pull/35391#discussion_r2253555140


##########
sdks/python/apache_beam/runners/worker/sdk_worker.py:
##########
@@ -207,6 +208,7 @@ def __init__(
     self._profiler_factory = profiler_factory
     self.data_sampler = data_sampler
     self.runner_capabilities = runner_capabilities
+    self.element_processing_timeout_minutes = 
element_processing_timeout_minutes

Review Comment:
   _ prefix? seems more internal



##########
sdks/python/apache_beam/runners/worker/sdk_worker.py:
##########
@@ -230,7 +232,12 @@ def default_factory(id):
             status_address,
             self._bundle_processor_cache,
             self._state_cache,
-            enable_heap_dump)  # type: Optional[FnApiWorkerStatusHandler]
+            enable_heap_dump,
+            element_processing_timeout_minutes=self.
+            element_processing_timeout_minutes
+        )  # type: Optional[FnApiWorkerStatusHandler]
+      except TimeoutError as e:

Review Comment:
   this doesn't seem like the right place for handling this as this is just 
constructing the status handler.  The error will be thrown later after 
processing has started on the thread that the status handler started:
   ```
    self._lull_logger = threading.Thread(
   ```
   
   I'm not a Python expert so I'm not sure the best way to cleanup/terminate 
once we determine we want to. It could be that throwing unhandled error from 
that thread will terminate the sdk or perhaps there is some better way to do it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to