tvalentyn commented on code in PR #36518:
URL: https://github.com/apache/beam/pull/36518#discussion_r2433493117
##########
sdks/python/apache_beam/runners/worker/worker_status.py:
##########
@@ -247,16 +259,56 @@ def generate_status_response(self):
def close(self):
self._responses.put(DONE, timeout=5)
- def _log_lull_in_bundle_processor(self, bundle_process_cache):
+ def _log_lull_in_bundle_processor(self, bundle_processor_cache):
while True:
time.sleep(2 * 60)
- if bundle_process_cache and
bundle_process_cache.active_bundle_processors:
- for instruction in list(
- bundle_process_cache.active_bundle_processors.keys()):
- processor = bundle_process_cache.lookup(instruction)
- if processor:
- info = processor.state_sampler.get_info()
- self._log_lull_sampler_info(info, instruction)
+ if not bundle_processor_cache:
+ continue
+
+ for instruction in list(
+ bundle_processor_cache.active_bundle_processors.keys()):
+ processor = bundle_processor_cache.lookup(instruction)
+ if processor:
+ info = processor.state_sampler.get_info()
+ self._log_lull_sampler_info(info, instruction)
+
+ for instruction, (bundle_id, thread, creation_time) in list(
+ bundle_processor_cache.processors_being_created.items()):
+ self._log_lull_in_creating_bundle_descriptor(
+ instruction, bundle_id, thread, creation_time)
+
+ def _log_lull_in_creating_bundle_descriptor(
+ self, instruction, bundle_id, thread, creation_time):
+ time_since_creation_ns = (time.time() - creation_time) * 1e9
+
+ if (self._element_processing_timeout_ns and
+ time_since_creation_ns > self._element_processing_timeout_ns):
+ stack_trace = self._get_stack_trace(thread)
+ _LOGGER.error((
+ 'Creation of bundle processor for instruction %s (bundle %s) '
+ 'has exceeded the specified timeout of %.2f minutes. '
+ 'This might indicate stuckness in DoFn.setup() or in DoFn creation. '
+ 'SDK harness will be terminated.\n'
+ 'Current Traceback:\n%s'),
+ instruction,
+ bundle_id,
+ self._element_processing_timeout_ns / 1e9 / 60,
+ stack_trace)
+ from apache_beam.runners.worker.sdk_worker_main import
terminate_sdk_harness
+ terminate_sdk_harness()
+
+ if (time_since_creation_ns > self.log_lull_timeout_ns and
+ self._passed_lull_timeout_since_last_log()):
Review Comment:
yeah, reordering is a better logginc. thanks.
##########
sdks/python/apache_beam/runners/worker/worker_status.py:
##########
@@ -247,16 +259,56 @@ def generate_status_response(self):
def close(self):
self._responses.put(DONE, timeout=5)
- def _log_lull_in_bundle_processor(self, bundle_process_cache):
+ def _log_lull_in_bundle_processor(self, bundle_processor_cache):
while True:
time.sleep(2 * 60)
- if bundle_process_cache and
bundle_process_cache.active_bundle_processors:
- for instruction in list(
- bundle_process_cache.active_bundle_processors.keys()):
- processor = bundle_process_cache.lookup(instruction)
- if processor:
- info = processor.state_sampler.get_info()
- self._log_lull_sampler_info(info, instruction)
+ if not bundle_processor_cache:
+ continue
+
+ for instruction in list(
+ bundle_processor_cache.active_bundle_processors.keys()):
+ processor = bundle_processor_cache.lookup(instruction)
+ if processor:
+ info = processor.state_sampler.get_info()
+ self._log_lull_sampler_info(info, instruction)
+
+ for instruction, (bundle_id, thread, creation_time) in list(
+ bundle_processor_cache.processors_being_created.items()):
+ self._log_lull_in_creating_bundle_descriptor(
+ instruction, bundle_id, thread, creation_time)
+
+ def _log_lull_in_creating_bundle_descriptor(
+ self, instruction, bundle_id, thread, creation_time):
+ time_since_creation_ns = (time.time() - creation_time) * 1e9
+
+ if (self._element_processing_timeout_ns and
+ time_since_creation_ns > self._element_processing_timeout_ns):
+ stack_trace = self._get_stack_trace(thread)
+ _LOGGER.error((
+ 'Creation of bundle processor for instruction %s (bundle %s) '
+ 'has exceeded the specified timeout of %.2f minutes. '
+ 'This might indicate stuckness in DoFn.setup() or in DoFn creation. '
+ 'SDK harness will be terminated.\n'
+ 'Current Traceback:\n%s'),
+ instruction,
+ bundle_id,
+ self._element_processing_timeout_ns / 1e9 / 60,
+ stack_trace)
+ from apache_beam.runners.worker.sdk_worker_main import
terminate_sdk_harness
+ terminate_sdk_harness()
+
+ if (time_since_creation_ns > self.log_lull_timeout_ns and
+ self._passed_lull_timeout_since_last_log()):
Review Comment:
yeah, reordering is a better logic. thanks.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]