scwhittle commented on code in PR #36518:
URL: https://github.com/apache/beam/pull/36518#discussion_r2431884474
##########
sdks/python/apache_beam/runners/worker/worker_status.py:
##########
@@ -247,16 +259,56 @@ def generate_status_response(self):
def close(self):
self._responses.put(DONE, timeout=5)
- def _log_lull_in_bundle_processor(self, bundle_process_cache):
+ def _log_lull_in_bundle_processor(self, bundle_processor_cache):
while True:
time.sleep(2 * 60)
- if bundle_process_cache and
bundle_process_cache.active_bundle_processors:
- for instruction in list(
- bundle_process_cache.active_bundle_processors.keys()):
- processor = bundle_process_cache.lookup(instruction)
- if processor:
- info = processor.state_sampler.get_info()
- self._log_lull_sampler_info(info, instruction)
+ if not bundle_processor_cache:
+ continue
+
+ for instruction in list(
+ bundle_processor_cache.active_bundle_processors.keys()):
+ processor = bundle_processor_cache.lookup(instruction)
+ if processor:
+ info = processor.state_sampler.get_info()
+ self._log_lull_sampler_info(info, instruction)
+
+ for instruction, (bundle_id, thread, creation_time) in list(
+ bundle_processor_cache.processors_being_created.items()):
+ self._log_lull_in_creating_bundle_descriptor(
+ instruction, bundle_id, thread, creation_time)
+
+ def _log_lull_in_creating_bundle_descriptor(
+ self, instruction, bundle_id, thread, creation_time):
+ time_since_creation_ns = (time.time() - creation_time) * 1e9
+
+ if (self._element_processing_timeout_ns and
+ time_since_creation_ns > self._element_processing_timeout_ns):
+ stack_trace = self._get_stack_trace(thread)
+ _LOGGER.error((
+ 'Creation of bundle processor for instruction %s (bundle %s) '
+ 'has exceeded the specified timeout of %.2f minutes. '
+ 'This might indicate stuckness in DoFn.setup() or in DoFn creation. '
+ 'SDK harness will be terminated.\n'
+ 'Current Traceback:\n%s'),
+ instruction,
+ bundle_id,
+ self._element_processing_timeout_ns / 1e9 / 60,
+ stack_trace)
+ from apache_beam.runners.worker.sdk_worker_main import
terminate_sdk_harness
+ terminate_sdk_harness()
+
+ if (time_since_creation_ns > self.log_lull_timeout_ns and
+ self._passed_lull_timeout_since_last_log()):
Review Comment:
I think there is a bug in _log_lull_sampler_info below
I think the _passed_lull_timeout_since_last_log() should be ordered after
checking if time_since_transition > log_lull_timeout_ns
##########
sdks/python/apache_beam/runners/worker/worker_status.py:
##########
@@ -281,7 +333,7 @@ def _log_lull_sampler_info(self, sampler_info, instruction):
' for PTransform{name=%s, state=%s}' % (step_name, state_name))
else:
step_name_log = ''
- stack_trace = self._get_stack_trace(sampler_info)
+ stack_trace = self._get_stack_trace(sampler_info.tracked_thread)
Review Comment:
this was getattr(sampler_info, 'tracked_thread', None) before
are we guaranteed that this attr exists?
##########
sdks/python/apache_beam/runners/worker/worker_status.py:
##########
@@ -149,6 +150,17 @@ def _active_processing_bundles_state(bundle_process_cache):
state += "time since transition: %.2f seconds\n" % (s[3] / 1e9)
active_bundles.append(state)
+ if bundle_processor_cache.processors_being_created:
+ active_bundles.append("Processors being created:\n")
+ for instruction, (bundle_id, thread, creation_time) in (
+ bundle_processor_cache.processors_being_created.items()):
+ state = '--- instruction %s ---\n' % instruction
+ state += 'ProcessBundleDescriptorId: %s\n' % bundle_id
+ state += "tracked thread: %s\n" % thread
+ state += "time since creation started: %.2f seconds\n" % (
+ time.time() - creation_time)
Review Comment:
call time.time() once?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]