ryan-mbuashundip commented on code in PR #35391: URL: https://github.com/apache/beam/pull/35391#discussion_r2246042021
########## sdks/python/apache_beam/options/pipeline_options.py: ########## @@ -1455,6 +1455,14 @@ def _add_argparse_args(cls, parser): 'responsible for executing the user code and communicating with ' 'the runner. Depending on the runner, there may be more than one ' 'SDK Harness process running on the same worker node.')) + parser.add_argument( + '--element_processing_timeout_minutes', + type=int, + default=None, + help=( + 'The time limit (minute) that an SDK worker allows for a PTransform' + ' operation to process one element before signaling the runner harness' + ' to restart the SDKworker.')) Review Comment: ```suggestion ' to restart the SDK worker.')) ``` ########## sdks/python/apache_beam/options/pipeline_options_test.py: ########## @@ -405,16 +405,25 @@ def test_experiments(self): self.assertEqual(options.get_all_options()['experiments'], None) def test_worker_options(self): - options = PipelineOptions(['--machine_type', 'abc', '--disk_type', 'def']) + options = PipelineOptions([ + '--machine_type', + 'abc', + '--disk_type', + 'def', + '--element_processing_timeout_minutes', + '10', + ]) worker_options = options.view_as(WorkerOptions) self.assertEqual(worker_options.machine_type, 'abc') self.assertEqual(worker_options.disk_type, 'def') + self.assertEqual(worker_options.element_processing_timeout_minutes, 10) options = PipelineOptions( ['--worker_machine_type', 'abc', '--worker_disk_type', 'def']) worker_options = options.view_as(WorkerOptions) self.assertEqual(worker_options.machine_type, 'abc') self.assertEqual(worker_options.disk_type, 'def') + self.assertIsNone(worker_options.element_processing_timeout_minutes) Review Comment: This test case isn't necessary. ########## sdks/python/apache_beam/runners/worker/worker_status.py: ########## @@ -250,14 +255,36 @@ def _log_lull_in_bundle_processor(self, bundle_process_cache): if processor: info = processor.state_sampler.get_info() self._log_lull_sampler_info(info, instruction) + if self._element_processing_timeout_ns: + self._restart_lull(info, instruction) + + def _restart_lull(self, sampler_info, instruction): Review Comment: Nit: a name like "_termiate_sdk_worker_lull" better describes the scope of the method. ########## sdks/python/apache_beam/runners/worker/worker_status_test.py: ########## @@ -126,6 +127,26 @@ def get_state_sampler_info_for_lull(lull_duration_s): bundle_id, sampler_info = get_state_sampler_info_for_lull(21 * 60) self.fn_status_handler._log_lull_sampler_info(sampler_info, bundle_id) + def test_restart_lull_in_bundle_processor(self): + def get_state_sampler_info_for_lull(lull_duration_s): + return "bundle-id", statesampler.StateSamplerInfo( + CounterName('progress-msecs', 'stage_name', 'step_name'), + 1, + lull_duration_s * 1e9, + threading.current_thread()) Review Comment: Since you're reusing this method from the unit test above, you can move the definition outside the tests methods so it only needs to be defined once instead of twice. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org