ryan-mbuashundip commented on code in PR #35391:
URL: https://github.com/apache/beam/pull/35391#discussion_r2246042021


##########
sdks/python/apache_beam/options/pipeline_options.py:
##########
@@ -1455,6 +1455,14 @@ def _add_argparse_args(cls, parser):
             'responsible for executing the user code and communicating with '
             'the runner. Depending on the runner, there may be more than one '
             'SDK Harness process running on the same worker node.'))
+    parser.add_argument(
+        '--element_processing_timeout_minutes',
+        type=int,
+        default=None,
+        help=(
+            'The time limit (minute) that an SDK worker allows for a 
PTransform'
+            ' operation to process one element before signaling the runner 
harness'
+            ' to restart the SDKworker.'))

Review Comment:
   ```suggestion
               ' to restart the SDK worker.'))
   ```



##########
sdks/python/apache_beam/options/pipeline_options_test.py:
##########
@@ -405,16 +405,25 @@ def test_experiments(self):
     self.assertEqual(options.get_all_options()['experiments'], None)
 
   def test_worker_options(self):
-    options = PipelineOptions(['--machine_type', 'abc', '--disk_type', 'def'])
+    options = PipelineOptions([
+        '--machine_type',
+        'abc',
+        '--disk_type',
+        'def',
+        '--element_processing_timeout_minutes',
+        '10',
+    ])
     worker_options = options.view_as(WorkerOptions)
     self.assertEqual(worker_options.machine_type, 'abc')
     self.assertEqual(worker_options.disk_type, 'def')
+    self.assertEqual(worker_options.element_processing_timeout_minutes, 10)
 
     options = PipelineOptions(
         ['--worker_machine_type', 'abc', '--worker_disk_type', 'def'])
     worker_options = options.view_as(WorkerOptions)
     self.assertEqual(worker_options.machine_type, 'abc')
     self.assertEqual(worker_options.disk_type, 'def')
+    self.assertIsNone(worker_options.element_processing_timeout_minutes)

Review Comment:
   This test case isn't necessary.



##########
sdks/python/apache_beam/runners/worker/worker_status.py:
##########
@@ -250,14 +255,36 @@ def _log_lull_in_bundle_processor(self, 
bundle_process_cache):
           if processor:
             info = processor.state_sampler.get_info()
             self._log_lull_sampler_info(info, instruction)
+            if self._element_processing_timeout_ns:
+              self._restart_lull(info, instruction)
+
+  def _restart_lull(self, sampler_info, instruction):

Review Comment:
   Nit: a name like "_termiate_sdk_worker_lull" better describes the scope of 
the method.



##########
sdks/python/apache_beam/runners/worker/worker_status_test.py:
##########
@@ -126,6 +127,26 @@ def get_state_sampler_info_for_lull(lull_duration_s):
         bundle_id, sampler_info = get_state_sampler_info_for_lull(21 * 60)
         self.fn_status_handler._log_lull_sampler_info(sampler_info, bundle_id)
 
+  def test_restart_lull_in_bundle_processor(self):
+    def get_state_sampler_info_for_lull(lull_duration_s):
+      return "bundle-id", statesampler.StateSamplerInfo(
+          CounterName('progress-msecs', 'stage_name', 'step_name'),
+          1,
+          lull_duration_s * 1e9,
+          threading.current_thread())

Review Comment:
   Since you're reusing this method from the unit test above, you can move the 
definition outside the tests methods so it only needs to be defined once 
instead of twice.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to