tvalentyn commented on code in PR #35391:
URL: https://github.com/apache/beam/pull/35391#discussion_r2258144206


##########
sdks/python/apache_beam/options/pipeline_options.py:
##########
@@ -1455,6 +1455,14 @@ def _add_argparse_args(cls, parser):
             'responsible for executing the user code and communicating with '
             'the runner. Depending on the runner, there may be more than one '
             'SDK Harness process running on the same worker node.'))
+    parser.add_argument(
+        '--element_processing_timeout_minutes',
+        type=int,
+        default=None,
+        help=(
+            'The time limit (minute) that an SDK worker allows for a 
PTransform'

Review Comment:
   ```suggestion
               'The time limit (in minutes) that an SDK worker allows for a 
PTransform'
   ```



##########
sdks/python/apache_beam/runners/worker/sdk_worker.py:
##########
@@ -230,7 +232,12 @@ def default_factory(id):
             status_address,
             self._bundle_processor_cache,
             self._state_cache,
-            enable_heap_dump)  # type: Optional[FnApiWorkerStatusHandler]
+            enable_heap_dump,
+            element_processing_timeout_minutes=self.
+            element_processing_timeout_minutes
+        )  # type: Optional[FnApiWorkerStatusHandler]
+      except TimeoutError as e:

Review Comment:
   +1, the TimeoutError won't be caught here - this `try` block finishes after 
status handler is constructed.
   
   +1 - the error will be thrown on the status handler thread, uncaught, and it 
should simply terminate the status handler thread, but the main thread will 
keep running, SDK process shouldn't terminate ( did you see a different 
behavior when you tested this change? )
    
   Exiting the from StatusHandler thread should work (if we called 
`_shutdown_due_to_element_processing_timeout`) from the status handler. However 
it would be better if we flushed the logs, as is attempted to do on the main 
thread  
https://github.com/apache/beam/blob/4114f7c314a8ae3dccc9e2b78f2474b6337a6e7c/sdks/python/apache_beam/runners/worker/sdk_worker_main.py#L214-L219
 . 
   
   We could do this:
       - make logs handler a process-level global variable
       - add a helpers in sdk_worker_main to flush log handler (if defined)  + 
shut down the process. This would be callable from any thread. 
       - call these from worker_status thread when timeout is reached
   
   Even better would be to catch the expression on the main thread but that 
requires either message passing from child threads to main threads or 
refactoring thread management  using `concurrent.futures`. 



##########
sdks/python/apache_beam/runners/worker/sdk_worker.py:
##########
@@ -409,6 +416,11 @@ def create_worker(self):
     return SdkWorker(
         self._bundle_processor_cache, profiler_factory=self._profiler_factory)
 
+  def _shutdown_due_to_element_processing_timeout(
+      self, err: TimeoutError) -> None:
+    _LOGGER.error('%sThe SDK harness will be terminated.', str(err))

Review Comment:
   is an extra space needed here?



##########
sdks/python/apache_beam/runners/worker/worker_status.py:
##########
@@ -250,14 +255,36 @@ def _log_lull_in_bundle_processor(self, 
bundle_process_cache):
           if processor:
             info = processor.state_sampler.get_info()
             self._log_lull_sampler_info(info, instruction)
+            if self._element_processing_timeout_ns:
+              self._terminate_sdk_worker_lull(info, instruction)
+
+  def _terminate_sdk_worker_lull(self, sampler_info, instruction):

Review Comment:
   There is a lot of duplication in   `_terminate_sdk_worker_lull` and 
`_log_lull_sampler_info`. Can we combine the logic into:
   
   ```
     def _handle_lull_in_bundle_processor(...):
        if lull_duration_seconds > log_lull_timeout:
           ... 
        if lull_duration_seconds > element_processing_timeout: 
           ...
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to